This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new d53f44ee00 HDDS-8722. Improve thread names in Datanode. (#5491)
d53f44ee00 is described below
commit d53f44ee00aca509b050a58646ecaa6e5149291b
Author: Galsza <[email protected]>
AuthorDate: Thu Oct 26 20:19:19 2023 +0200
HDDS-8722. Improve thread names in Datanode. (#5491)
Co-authored-by: Arafat2198
<[email protected]>
---
.../container/common/report/ReportManager.java | 18 ++++++++----
.../common/statemachine/DatanodeStateMachine.java | 33 ++++++++++++++--------
.../common/statemachine/EndpointStateMachine.java | 6 ++--
.../common/statemachine/SCMConnectionManager.java | 15 ++++++----
.../common/statemachine/StateContext.java | 12 ++++++--
.../CloseContainerCommandHandler.java | 13 +++++----
.../commandhandler/DeleteBlocksCommandHandler.java | 11 ++++++--
.../DeleteContainerCommandHandler.java | 13 +++++----
.../common/states/datanode/InitDatanodeState.java | 5 ++--
.../common/transport/server/XceiverServerGrpc.java | 9 ++++--
.../server/ratis/ContainerStateMachine.java | 16 +++++++----
.../transport/server/ratis/XceiverServerRatis.java | 17 +++++------
.../common/volume/StorageVolumeChecker.java | 23 +++++++++------
.../ECReconstructionCoordinator.java | 15 +++++++---
.../ozone/container/ozoneimpl/OzoneContainer.java | 18 +++++++++---
.../container/replication/ReplicationServer.java | 25 +++++++++-------
.../replication/ReplicationSupervisor.java | 18 +++++++++---
.../ozone/container/common/ContainerTestUtils.java | 2 +-
.../container/common/TestDatanodeStateMachine.java | 2 +-
.../common/report/TestReportPublisher.java | 8 +++---
.../common/statemachine/TestStateContext.java | 18 ++++++------
.../TestCloseContainerCommandHandler.java | 2 +-
.../TestDeleteBlocksCommandHandler.java | 2 +-
.../TestDeleteContainerCommandHandler.java | 2 +-
.../states/datanode/TestRunningDatanodeState.java | 2 +-
.../states/endpoint/TestHeartbeatEndpointTask.java | 14 ++++-----
.../common/volume/TestPeriodicVolumeChecker.java | 3 +-
.../common/volume/TestStorageVolumeChecker.java | 4 +--
.../common/volume/TestVolumeSetDiskChecks.java | 4 +--
.../replication/TestReplicationSupervisor.java | 2 +-
.../ozone/container/common/TestEndPoint.java | 2 +-
.../hdds/scm/storage/TestContainerCommandsEC.java | 31 ++++++++++++--------
32 files changed, 229 insertions(+), 136 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportManager.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportManager.java
index 97d7fe2d93..146138f88a 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportManager.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportManager.java
@@ -49,17 +49,18 @@ public final class ReportManager {
* Construction of {@link ReportManager} should be done via
* {@link ReportManager.Builder}.
*
- * @param context StateContext which holds the report
+ * @param context StateContext which holds the report
* @param publishers List of publishers which generates report
*/
- private ReportManager(StateContext context,
- List<ReportPublisher> publishers) {
+ private ReportManager(StateContext context, List<ReportPublisher> publishers,
+ String threadNamePrefix) {
this.context = context;
this.publishers = publishers;
this.executorService = HadoopExecutors.newScheduledThreadPool(
publishers.size(),
new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("Datanode ReportManager Thread - %d").build());
+ .setNameFormat(threadNamePrefix +
+ "DatanodeReportManager-%d").build());
}
/**
@@ -103,6 +104,7 @@ public final class ReportManager {
private StateContext stateContext;
private List<ReportPublisher> reportPublishers;
private ReportPublisherFactory publisherFactory;
+ private String threadNamePrefix = "";
private Builder(ConfigurationSource conf) {
@@ -146,6 +148,11 @@ public final class ReportManager {
return this;
}
+ public Builder addThreadNamePrefix(String threadPrefix) {
+ this.threadNamePrefix = threadPrefix;
+ return this;
+ }
+
/**
* Build and returns ReportManager.
*
@@ -153,7 +160,8 @@ public final class ReportManager {
*/
public ReportManager build() {
Preconditions.checkNotNull(stateContext);
- return new ReportManager(stateContext, reportPublishers);
+ return new ReportManager(
+ stateContext, reportPublishers, threadNamePrefix);
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index a1ee46d754..51290cf80d 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -23,6 +23,7 @@ import java.time.ZoneId;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
@@ -167,12 +168,16 @@ public class DatanodeStateMachine implements Closeable {
VersionedDatanodeFeatures.initialize(layoutVersionManager);
this.dnCRLStore = crlStore;
+ String threadNamePrefix = datanodeDetails.threadNamePrefix();
executorService = Executors.newFixedThreadPool(
getEndPointTaskThreadPoolSize(),
new ThreadFactoryBuilder()
- .setNameFormat("Datanode State Machine Task Thread - %d").build());
+ .setNameFormat(threadNamePrefix +
+ "DatanodeStateMachineTaskThread-%d")
+ .build());
connectionManager = new SCMConnectionManager(conf);
- context = new StateContext(this.conf, DatanodeStates.getInitState(), this);
+ context = new StateContext(this.conf, DatanodeStates.getInitState(), this,
+ threadNamePrefix);
// OzoneContainer instance is used in a non-thread safe way by the context
// past to its constructor, so we much synchronize its access. See
// HDDS-3116 for more details.
@@ -216,32 +221,35 @@ public class DatanodeStateMachine implements Closeable {
ecReconstructionMetrics = ECReconstructionMetrics.create();
ecReconstructionCoordinator = new ECReconstructionCoordinator(
- conf, certClient, secretKeyClient, context, ecReconstructionMetrics);
+ conf, certClient, secretKeyClient, context, ecReconstructionMetrics,
+ threadNamePrefix);
// This is created as an instance variable as Mockito needs to access it in
// a test. The test mocks it in a running mini-cluster.
reconstructECContainersCommandHandler =
new ReconstructECContainersCommandHandler(conf, supervisor,
- ecReconstructionCoordinator);
+ ecReconstructionCoordinator);
+ ThreadFactory threadFactory = new ThreadFactoryBuilder()
+ .setNameFormat(threadNamePrefix + "PipelineCommandHandlerThread-%d")
+ .build();
pipelineCommandExecutorService = Executors
- .newSingleThreadExecutor(new ThreadFactoryBuilder()
- .setNameFormat("PipelineCommandHandlerThread-%d").build());
+ .newSingleThreadExecutor(threadFactory);
// When we add new handlers just adding a new handler here should do the
// trick.
commandDispatcher = CommandDispatcher.newBuilder()
.addHandler(new CloseContainerCommandHandler(
dnConf.getContainerCloseThreads(),
- dnConf.getCommandQueueLimit()))
+ dnConf.getCommandQueueLimit(), threadNamePrefix))
.addHandler(new DeleteBlocksCommandHandler(getContainer(),
- conf, dnConf))
+ conf, dnConf, threadNamePrefix))
.addHandler(new ReplicateContainerCommandHandler(conf, supervisor,
pullReplicatorWithMetrics, pushReplicatorWithMetrics))
.addHandler(reconstructECContainersCommandHandler)
.addHandler(new DeleteContainerCommandHandler(
dnConf.getContainerDeleteThreads(), clock,
- dnConf.getCommandQueueLimit()))
+ dnConf.getCommandQueueLimit(), threadNamePrefix))
.addHandler(
new ClosePipelineCommandHandler(pipelineCommandExecutorService))
.addHandler(new CreatePipelineCommandHandler(conf,
@@ -262,6 +270,7 @@ public class DatanodeStateMachine implements Closeable {
.addPublisherFor(CommandStatusReportsProto.class)
.addPublisherFor(PipelineReportsProto.class)
.addPublisherFor(CRLStatusReport.class)
+ .addThreadNamePrefix(threadNamePrefix)
.build();
queueMetrics = DatanodeQueueMetrics.create(this);
@@ -551,7 +560,8 @@ public class DatanodeStateMachine implements Closeable {
};
stateMachineThread = new ThreadFactoryBuilder()
.setDaemon(true)
- .setNameFormat("Datanode State Machine Daemon Thread")
+ .setNameFormat(datanodeDetails.threadNamePrefix() +
+ "DatanodeStateMachineDaemonThread")
.setUncaughtExceptionHandler((Thread t, Throwable ex) -> {
String message = "Terminate Datanode, encounter uncaught exception"
+ " in Datanode State Machine Thread";
@@ -689,7 +699,8 @@ public class DatanodeStateMachine implements Closeable {
private Thread getCommandHandlerThread(Runnable processCommandQueue) {
Thread handlerThread = new Thread(processCommandQueue);
handlerThread.setDaemon(true);
- handlerThread.setName("Command processor thread");
+ handlerThread.setName(
+ datanodeDetails.threadNamePrefix() + "CommandProcessorThread");
handlerThread.setUncaughtExceptionHandler((Thread t, Throwable e) -> {
// Let us just restart this thread after logging a critical error.
// if this thread is not running we cannot handle commands from SCM.
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachine.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachine.java
index e687b956c1..0b9005422e 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachine.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachine.java
@@ -68,7 +68,7 @@ public class EndpointStateMachine
*/
public EndpointStateMachine(InetSocketAddress address,
StorageContainerDatanodeProtocolClientSideTranslatorPB endPoint,
- ConfigurationSource conf) {
+ ConfigurationSource conf, String threadNamePrefix) {
this.endPoint = endPoint;
this.missedCount = new AtomicLong(0);
this.address = address;
@@ -77,8 +77,8 @@ public class EndpointStateMachine
this.conf = conf;
executorService = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
- .setNameFormat("EndpointStateMachine task thread for "
- + this.address + " - %d ")
+ .setNameFormat(threadNamePrefix + "EndpointStateMachineTaskThread-"
+ + this.address + "-%d ")
.build());
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java
index e68b453ce7..f35415820e 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java
@@ -132,7 +132,8 @@ public class SCMConnectionManager
* @param address - Address of the SCM machine to send heartbeat to.
* @throws IOException
*/
- public void addSCMServer(InetSocketAddress address) throws IOException {
+ public void addSCMServer(InetSocketAddress address,
+ String threadNamePrefix) throws IOException {
writeLock();
try {
if (scmMachines.containsKey(address)) {
@@ -163,10 +164,10 @@ public class SCMConnectionManager
StorageContainerDatanodeProtocolClientSideTranslatorPB rpcClient =
new StorageContainerDatanodeProtocolClientSideTranslatorPB(
- rpcProxy);
+ rpcProxy);
- EndpointStateMachine endPoint =
- new EndpointStateMachine(address, rpcClient, this.conf);
+ EndpointStateMachine endPoint = new EndpointStateMachine(address,
+ rpcClient, this.conf, threadNamePrefix);
endPoint.setPassive(false);
scmMachines.put(address, endPoint);
} finally {
@@ -176,10 +177,12 @@ public class SCMConnectionManager
/**
* Adds a new Recon server to the set of endpoints.
+ *
* @param address Recon address.
* @throws IOException
*/
- public void addReconServer(InetSocketAddress address) throws IOException {
+ public void addReconServer(InetSocketAddress address,
+ String threadNamePrefix) throws IOException {
LOG.info("Adding Recon Server : {}", address.toString());
writeLock();
try {
@@ -209,7 +212,7 @@ public class SCMConnectionManager
new StorageContainerDatanodeProtocolClientSideTranslatorPB(rpcProxy);
EndpointStateMachine endPoint =
- new EndpointStateMachine(address, rpcClient, conf);
+ new EndpointStateMachine(address, rpcClient, conf, threadNamePrefix);
endPoint.setPassive(true);
scmMachines.put(address, endPoint);
} finally {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index f6075a620e..b29db88890 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -152,8 +152,11 @@ public class StateContext {
private final AtomicLong heartbeatFrequency = new AtomicLong(2000);
private final AtomicLong reconHeartbeatFrequency = new AtomicLong(2000);
-
+
private final int maxCommandQueueLimit;
+
+ private final String threadNamePrefix;
+
/**
* Constructs a StateContext.
*
@@ -163,7 +166,7 @@ public class StateContext {
*/
public StateContext(ConfigurationSource conf,
DatanodeStateMachine.DatanodeStates
- state, DatanodeStateMachine parent) {
+ state, DatanodeStateMachine parent, String threadNamePrefix) {
this.conf = conf;
DatanodeConfiguration dnConf =
conf.getObject(DatanodeConfiguration.class);
@@ -187,6 +190,7 @@ public class StateContext {
isFullReportReadyToBeSent = new HashMap<>();
fullReportTypeList = new ArrayList<>();
type2Reports = new HashMap<>();
+ this.threadNamePrefix = threadNamePrefix;
initReportTypeCollection();
}
@@ -968,4 +972,8 @@ public class StateContext {
public DatanodeQueueMetrics getQueueMetrics() {
return parentDatanodeStateMachine.getQueueMetrics();
}
+
+ public String getThreadNamePrefix() {
+ return threadNamePrefix;
+ }
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
index b34082550f..7b739b3733 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
@@ -65,13 +65,14 @@ public class CloseContainerCommandHandler implements
CommandHandler {
* Constructs a ContainerReport handler.
*/
public CloseContainerCommandHandler(
- int threadPoolSize, int queueSize) {
+ int threadPoolSize, int queueSize, String threadNamePrefix) {
executor = new ThreadPoolExecutor(
- threadPoolSize, threadPoolSize,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<>(queueSize),
- new ThreadFactoryBuilder()
- .setNameFormat("CloseContainerThread-%d").build());
+ threadPoolSize, threadPoolSize,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(queueSize),
+ new ThreadFactoryBuilder()
+ .setNameFormat(threadNamePrefix + "CloseContainerThread-%d")
+ .build());
}
/**
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
index d29162dd88..0b5b5a68b7 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
@@ -69,6 +69,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@@ -100,7 +101,8 @@ public class DeleteBlocksCommandHandler implements
CommandHandler {
private final Map<String, SchemaHandler> schemaHandlers;
public DeleteBlocksCommandHandler(OzoneContainer container,
- ConfigurationSource conf, DatanodeConfiguration dnConf) {
+ ConfigurationSource conf, DatanodeConfiguration dnConf,
+ String threadNamePrefix) {
this.ozoneContainer = container;
this.containerSet = container.getContainerSet();
this.conf = conf;
@@ -111,9 +113,12 @@ public class DeleteBlocksCommandHandler implements
CommandHandler {
schemaHandlers.put(SCHEMA_V2, this::markBlocksForDeletionSchemaV2);
schemaHandlers.put(SCHEMA_V3, this::markBlocksForDeletionSchemaV3);
+ ThreadFactory threadFactory = new ThreadFactoryBuilder()
+ .setNameFormat(threadNamePrefix +
+ "DeleteBlocksCommandHandlerThread-%d")
+ .build();
this.executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(
- dnConf.getBlockDeleteThreads(), new ThreadFactoryBuilder()
- .setNameFormat("DeleteBlocksCommandHandlerThread-%d").build());
+ dnConf.getBlockDeleteThreads(), threadFactory);
this.deleteCommandQueues =
new LinkedBlockingQueue<>(dnConf.getBlockDeleteQueueLimit());
handlerThread = new Daemon(new DeleteCmdWorker());
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
index f2e9b748db..ead81c32e5 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
@@ -58,13 +58,14 @@ public class DeleteContainerCommandHandler implements
CommandHandler {
private int maxQueueSize;
public DeleteContainerCommandHandler(
- int threadPoolSize, Clock clock, int queueSize) {
+ int threadPoolSize, Clock clock, int queueSize, String threadNamePrefix)
{
this(clock, new ThreadPoolExecutor(
- threadPoolSize, threadPoolSize,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<>(queueSize),
- new ThreadFactoryBuilder()
- .setNameFormat("DeleteContainerThread-%d").build()),
+ threadPoolSize, threadPoolSize,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(queueSize),
+ new ThreadFactoryBuilder()
+ .setNameFormat(threadNamePrefix + "DeleteContainerThread-%d")
+ .build()),
queueSize);
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java
index 290d39f88e..0b4fbfe2fa 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java
@@ -101,12 +101,13 @@ public class InitDatanodeState implements DatanodeState,
}
}
for (InetSocketAddress addr : addresses) {
- connectionManager.addSCMServer(addr);
+ connectionManager.addSCMServer(addr, context.getThreadNamePrefix());
this.context.addEndpoint(addr);
}
InetSocketAddress reconAddress = getReconAddresses(conf);
if (reconAddress != null) {
- connectionManager.addReconServer(reconAddress);
+ connectionManager.addReconServer(reconAddress,
+ context.getThreadNamePrefix());
this.context.addEndpoint(reconAddress);
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
index f5fcb6eacf..b421177b44 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
@@ -114,11 +114,14 @@ public final class XceiverServerGrpc implements
XceiverServerSpi {
60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("ChunkReader-%d")
+ .setNameFormat(datanodeDetails.threadNamePrefix() +
+ "ChunkReader-%d")
.build());
- ThreadFactory factory = new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("ChunkReader-ELG-%d")
+ ThreadFactory factory = new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat(datanodeDetails.threadNamePrefix() +
+ "ChunkReader-ELG-%d")
.build();
if (Epoll.isAvailable()) {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index ae32df6f45..354d203328 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
@@ -185,10 +186,13 @@ public class ContainerStateMachine extends
BaseStateMachine {
private final CSMMetrics metrics;
@SuppressWarnings("parameternumber")
- public ContainerStateMachine(RaftGroupId gid, ContainerDispatcher dispatcher,
+ public ContainerStateMachine(RaftGroupId gid,
+ ContainerDispatcher dispatcher,
ContainerController containerController,
List<ThreadPoolExecutor> chunkExecutors,
- XceiverServerRatis ratisServer, ConfigurationSource conf) {
+ XceiverServerRatis ratisServer,
+ ConfigurationSource conf,
+ String threadNamePrefix) {
this.gid = gid;
this.dispatcher = dispatcher;
this.containerController = containerController;
@@ -226,10 +230,12 @@ public class ContainerStateMachine extends
BaseStateMachine {
applyTransactionSemaphore = new Semaphore(maxPendingApplyTransactions);
stateMachineHealthy = new AtomicBoolean(true);
+ ThreadFactory threadFactory = new ThreadFactoryBuilder()
+ .setNameFormat(
+ threadNamePrefix + "ContainerOp-" + gid.getUuid() + "-%d")
+ .build();
this.executor = Executors.newFixedThreadPool(numContainerOpExecutors,
- new ThreadFactoryBuilder()
- .setNameFormat("ContainerOp-" + gid.getUuid() + "-%d")
- .build());
+ threadFactory);
this.waitOnBothFollowers = conf.getObject(
DatanodeConfiguration.class).waitOnAllFollowers();
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 932526aa88..5c53eb6b95 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -167,13 +167,14 @@ public final class XceiverServerRatis implements
XceiverServerSpi {
this.dispatcher = dispatcher;
this.containerController = containerController;
this.raftPeerId = RatisHelper.toRaftPeerId(dd);
- chunkExecutors = createChunkExecutors(conf);
+ String threadNamePrefix = datanodeDetails.threadNamePrefix();
+ chunkExecutors = createChunkExecutors(conf, threadNamePrefix);
nodeFailureTimeoutMs =
- conf.getObject(DatanodeRatisServerConfig.class)
- .getFollowerSlownessTimeout();
+ conf.getObject(DatanodeRatisServerConfig.class)
+ .getFollowerSlownessTimeout();
shouldDeleteRatisLogDirectory =
- conf.getObject(DatanodeRatisServerConfig.class)
- .shouldDeleteRatisLogDirectory();
+ conf.getObject(DatanodeRatisServerConfig.class)
+ .shouldDeleteRatisLogDirectory();
RaftServer.Builder builder =
RaftServer.newBuilder().setServerId(raftPeerId)
@@ -215,7 +216,7 @@ public final class XceiverServerRatis implements
XceiverServerSpi {
private ContainerStateMachine getStateMachine(RaftGroupId gid) {
return new ContainerStateMachine(gid, dispatcher, containerController,
- chunkExecutors, this, conf);
+ chunkExecutors, this, conf, datanodeDetails.threadNamePrefix());
}
private void setUpRatisStream(RaftProperties properties) {
@@ -920,7 +921,7 @@ public final class XceiverServerRatis implements
XceiverServerSpi {
}
private static List<ThreadPoolExecutor> createChunkExecutors(
- ConfigurationSource conf) {
+ ConfigurationSource conf, String threadNamePrefix) {
// TODO create single pool with N threads if using non-incremental chunks
final int threadCountPerDisk = conf.getInt(
OzoneConfigKeys
@@ -936,7 +937,7 @@ public final class XceiverServerRatis implements
XceiverServerSpi {
for (int i = 0; i < executors.length; i++) {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
- .setNameFormat("ChunkWriter-" + i + "-%d")
+ .setNameFormat(threadNamePrefix + "ChunkWriter-" + i + "-%d")
.build();
BlockingQueue<Runnable> workQueue = new LinkedBlockingDeque<>();
executors[i] = new ThreadPoolExecutor(1, 1,
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolumeChecker.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolumeChecker.java
index bcb11fddf0..eddc77f18e 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolumeChecker.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolumeChecker.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -107,7 +108,8 @@ public class StorageVolumeChecker {
* @param conf Configuration object.
* @param timer {@link Timer} object used for throttling checks.
*/
- public StorageVolumeChecker(ConfigurationSource conf, Timer timer) {
+ public StorageVolumeChecker(ConfigurationSource conf, Timer timer,
+ String threadNamePrefix) {
this.timer = timer;
@@ -125,22 +127,25 @@ public class StorageVolumeChecker {
timer, minDiskCheckGapMs, maxAllowedTimeForCheckMs,
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
- .setNameFormat("DataNode DiskChecker thread %d")
+ .setNameFormat(threadNamePrefix + "DataNodeDiskChecker" +
+ "Thread-%d")
.setDaemon(true)
.build()));
checkVolumeResultHandlerExecutorService = Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
- .setNameFormat("VolumeCheck ResultHandler thread %d")
+ .setNameFormat(threadNamePrefix + "VolumeCheckResultHandler" +
+ "Thread-%d")
.setDaemon(true)
.build());
- this.diskCheckerservice = Executors.newScheduledThreadPool(
- 1, r -> {
- Thread t = new Thread(r, "Periodic HDDS volume checker");
- t.setDaemon(true);
- return t;
- });
+ ThreadFactory threadFactory = r -> {
+ Thread t = new Thread(r, threadNamePrefix + "PeriodicHDDSVolumeChecker");
+ t.setDaemon(true);
+ return t;
+ };
+ this.diskCheckerservice = Executors.newSingleThreadScheduledExecutor(
+ threadFactory);
started = new AtomicBoolean(false);
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
index 892dbaf22b..80348dbe45 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
@@ -69,6 +69,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -114,17 +115,23 @@ public class ECReconstructionCoordinator implements
Closeable {
public ECReconstructionCoordinator(
ConfigurationSource conf, CertificateClient certificateClient,
SecretKeySignerClient secretKeyClient, StateContext context,
- ECReconstructionMetrics metrics) throws IOException {
+ ECReconstructionMetrics metrics,
+ String threadNamePrefix) throws IOException {
this.context = context;
this.containerOperationClient = new ECContainerOperationClient(conf,
certificateClient);
this.byteBufferPool = new ElasticByteBufferPool();
+ ThreadFactory threadFactory = new ThreadFactoryBuilder()
+ .setNameFormat(threadNamePrefix + "ec-reconstruct-reader-TID-%d")
+ .build();
this.ecReconstructExecutor =
new ThreadPoolExecutor(EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE,
conf.getObject(OzoneClientConfig.class)
- .getEcReconstructStripeReadPoolLimit(), 60, TimeUnit.SECONDS,
- new SynchronousQueue<>(), new ThreadFactoryBuilder()
- .setNameFormat("ec-reconstruct-reader-TID-%d").build(),
+ .getEcReconstructStripeReadPoolLimit(),
+ 60,
+ TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ threadFactory,
new ThreadPoolExecutor.CallerRunsPolicy());
this.blockInputStreamFactory = BlockInputStreamFactoryImpl
.getInstance(byteBufferPool, () -> ecReconstructExecutor);
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 404297270a..1e34fb1049 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.container.ozoneimpl;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
@@ -71,6 +72,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -141,7 +143,8 @@ public class OzoneContainer {
config = conf;
this.datanodeDetails = datanodeDetails;
this.context = context;
- this.volumeChecker = new StorageVolumeChecker(conf, new Timer());
+ this.volumeChecker = new StorageVolumeChecker(conf, new Timer(),
+ datanodeDetails.threadNamePrefix());
volumeSet = new MutableVolumeSet(datanodeDetails.getUuidString(), conf,
context, VolumeType.DATA_VOLUME, volumeChecker);
@@ -211,7 +214,8 @@ public class OzoneContainer {
secConf,
certClient,
new ContainerImporter(conf, containerSet, controller,
- volumeSet));
+ volumeSet),
+ datanodeDetails.threadNamePrefix());
readChannel = new XceiverServerGrpc(
datanodeDetails, config, hddsDispatcher, certClient);
@@ -292,10 +296,16 @@ public class OzoneContainer {
// system properties set. These can inspect and possibly repair
// containers as we iterate them here.
ContainerInspectorUtil.load();
+ String threadNamePrefix = datanodeDetails.threadNamePrefix();
+ ThreadFactory threadFactory = new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat(threadNamePrefix + "ContainerReader-%d")
+ .build();
while (volumeSetIterator.hasNext()) {
StorageVolume volume = volumeSetIterator.next();
- Thread thread = new Thread(new ContainerReader(volumeSet,
- (HddsVolume) volume, containerSet, config, true));
+ ContainerReader containerReader = new ContainerReader(volumeSet,
+ (HddsVolume) volume, containerSet, config, true);
+ Thread thread = threadFactory.newThread(containerReader);
thread.start();
volumeThreads.add(thread);
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
index a2e0209f64..3feb574748 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.replication;
import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -69,7 +70,8 @@ public class ReplicationServer {
public ReplicationServer(ContainerController controller,
ReplicationConfig replicationConfig, SecurityConfig secConf,
- CertificateClient caClient, ContainerImporter importer) {
+ CertificateClient caClient, ContainerImporter importer,
+ String threadNamePrefix) {
this.secConf = secConf;
this.caClient = caClient;
this.controller = controller;
@@ -81,17 +83,20 @@ public class ReplicationServer {
int replicationQueueLimit =
replicationConfig.getReplicationQueueLimit();
LOG.info("Initializing replication server with thread count = {}"
- + " queue length = {}",
+ + " queue length = {}",
replicationConfig.getReplicationMaxStreams(),
replicationConfig.getReplicationQueueLimit());
- this.executor =
- new ThreadPoolExecutor(replicationServerWorkers,
- replicationServerWorkers,
- 60, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(replicationQueueLimit),
- new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("ReplicationContainerReader-%d")
- .build());
+ ThreadFactory threadFactory = new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat(threadNamePrefix + "ReplicationContainerReader-%d")
+ .build();
+ this.executor = new ThreadPoolExecutor(
+ replicationServerWorkers,
+ replicationServerWorkers,
+ 60,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(replicationQueueLimit),
+ threadFactory);
init();
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
index b2a38df804..ee51463309 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
@@ -29,6 +29,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -103,7 +104,9 @@ public final class ReplicationSupervisor {
private DatanodeConfiguration datanodeConfig;
private ExecutorService executor;
private Clock clock;
- private IntConsumer executorThreadUpdater = threadCount -> { };
+ private IntConsumer executorThreadUpdater = threadCount -> {
+ };
+ private String threadNamePrefix;
public Builder clock(Clock newClock) {
clock = newClock;
@@ -135,6 +138,11 @@ public final class ReplicationSupervisor {
return this;
}
+ public Builder threadNamePrefix(String threadPrefix) {
+ this.threadNamePrefix = threadPrefix;
+ return this;
+ }
+
public ReplicationSupervisor build() {
if (replicationConfig == null || datanodeConfig == null) {
ConfigurationSource conf = new OzoneConfiguration();
@@ -154,14 +162,16 @@ public final class ReplicationSupervisor {
if (executor == null) {
LOG.info("Initializing replication supervisor with thread count = {}",
replicationConfig.getReplicationMaxStreams());
+ ThreadFactory threadFactory = new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat(threadNamePrefix + "ContainerReplicationThread-%d")
+ .build();
ThreadPoolExecutor tpe = new ThreadPoolExecutor(
replicationConfig.getReplicationMaxStreams(),
replicationConfig.getReplicationMaxStreams(),
60, TimeUnit.SECONDS,
new PriorityBlockingQueue<>(),
- new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("ContainerReplicationThread-%d")
- .build());
+ threadFactory);
executor = tpe;
executorThreadUpdater = threadCount -> {
if (threadCount < tpe.getCorePoolSize()) {
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
index 08476300c9..092b7a84aa 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
@@ -110,7 +110,7 @@ public final class ContainerTestUtils {
StorageContainerDatanodeProtocolClientSideTranslatorPB rpcClient =
new StorageContainerDatanodeProtocolClientSideTranslatorPB(rpcProxy);
return new EndpointStateMachine(address, rpcClient,
- new LegacyHadoopConfigurationSource(conf));
+ new LegacyHadoopConfigurationSource(conf), "");
}
public static OzoneContainer getOzoneContainer(
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
index 630308be22..914057f70a 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
@@ -120,7 +120,7 @@ public class TestDatanodeStateMachine {
conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ID_DIR, path);
executorService = HadoopExecutors.newCachedThreadPool(
new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("Test Data Node State Machine Thread -
%d").build());
+ .setNameFormat("TestDataNodeStateMachineThread-%d").build());
}
@AfterEach
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
index d611caf126..9fb9c7251c 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
@@ -109,7 +109,7 @@ public class TestReportPublisher {
ScheduledExecutorService executorService = HadoopExecutors
.newScheduledThreadPool(1,
new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("Unit test ReportManager Thread - %d").build());
+ .setNameFormat("TestReportManagerThread-%d").build());
publisher.init(dummyContext, executorService);
Thread.sleep(150);
Assertions.assertEquals(1,
@@ -131,7 +131,7 @@ public class TestReportPublisher {
ScheduledExecutorService executorService = HadoopExecutors
.newScheduledThreadPool(1,
new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("Unit test ReportManager Thread - %d").build());
+ .setNameFormat("TestReportManagerThread-%d").build());
publisher.init(dummyContext, executorService);
Thread.sleep(150);
executorService.shutdown();
@@ -155,7 +155,7 @@ public class TestReportPublisher {
ScheduledExecutorService executorService = HadoopExecutors
.newScheduledThreadPool(1,
new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("Unit test ReportManager Thread - %d").build());
+ .setNameFormat("TestReportManagerThread-%d").build());
publisher.init(dummyContext, executorService);
Assertions.assertNull(
((CommandStatusReportPublisher) publisher).getReport());
@@ -200,7 +200,7 @@ public class TestReportPublisher {
ScheduledExecutorService executorService = HadoopExecutors
.newScheduledThreadPool(1,
new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("Unit test ReportManager Thread - %d").build());
+ .setNameFormat("TestReportManagerThread-%d").build());
publisher.init(dummyContext, executorService);
Message report =
((CRLStatusReportPublisher) publisher).getReport();
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
index a0e66982de..2b20fbce7a 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
@@ -85,7 +85,7 @@ public class TestStateContext {
mock(DatanodeStateMachine.class);
StateContext ctx = new StateContext(conf, DatanodeStates.getInitState(),
- datanodeStateMachineMock);
+ datanodeStateMachineMock, "");
InetSocketAddress scm1 = new InetSocketAddress("scm1", 9001);
ctx.addEndpoint(scm1);
InetSocketAddress scm2 = new InetSocketAddress("scm2", 9001);
@@ -300,7 +300,7 @@ public class TestStateContext {
private StateContext newStateContext(OzoneConfiguration conf,
DatanodeStateMachine datanodeStateMachineMock) {
StateContext stateContext = new StateContext(conf,
- DatanodeStates.getInitState(), datanodeStateMachineMock);
+ DatanodeStates.getInitState(), datanodeStateMachineMock, "");
InetSocketAddress scm1 = new InetSocketAddress("scm1", 9001);
stateContext.addEndpoint(scm1);
InetSocketAddress scm2 = new InetSocketAddress("scm2", 9001);
@@ -328,7 +328,7 @@ public class TestStateContext {
DatanodeStateMachine datanodeStateMachineMock =
mock(DatanodeStateMachine.class);
StateContext stateContext = new StateContext(conf,
- DatanodeStates.getInitState(), datanodeStateMachineMock);
+ DatanodeStates.getInitState(), datanodeStateMachineMock, "");
InetSocketAddress scm1 = new InetSocketAddress("scm1", 9001);
InetSocketAddress scm2 = new InetSocketAddress("scm2", 9001);
@@ -365,7 +365,7 @@ public class TestStateContext {
DatanodeStateMachine datanodeStateMachineMock =
mock(DatanodeStateMachine.class);
StateContext stateContext = new StateContext(conf,
- DatanodeStates.getInitState(), datanodeStateMachineMock);
+ DatanodeStates.getInitState(), datanodeStateMachineMock, "");
InetSocketAddress scm1 = new InetSocketAddress("scm1", 9001);
InetSocketAddress scm2 = new InetSocketAddress("scm2", 9001);
@@ -429,7 +429,7 @@ public class TestStateContext {
final AtomicBoolean taskExecuted = new AtomicBoolean();
StateContext subject = new StateContext(new OzoneConfiguration(),
- DatanodeStates.getInitState(), mock(DatanodeStateMachine.class)) {
+ DatanodeStates.getInitState(), mock(DatanodeStateMachine.class), "") {
@Override
public DatanodeState<DatanodeStates> getTask() {
// this task waits until {@code subject} is shutdown
@@ -478,7 +478,7 @@ public class TestStateContext {
@Test
public void testIsThreadPoolAvailable() throws Exception {
StateContext stateContext = new StateContext(
- new OzoneConfiguration(), null, null);
+ new OzoneConfiguration(), null, null, "");
int threadPoolSize = 2;
ExecutorService executorService = Executors.newFixedThreadPool(
@@ -514,7 +514,7 @@ public class TestStateContext {
executorService.submit((Callable<String>) future::get);
StateContext subject = new StateContext(new OzoneConfiguration(),
- DatanodeStates.INIT, mock(DatanodeStateMachine.class)) {
+ DatanodeStates.INIT, mock(DatanodeStateMachine.class), "") {
@Override
public DatanodeState<DatanodeStates> getTask() {
// this task counts the number of execute() and await() calls
@@ -564,7 +564,7 @@ public class TestStateContext {
mock(DatanodeStateMachine.class);
StateContext ctx = new StateContext(conf, DatanodeStates.getInitState(),
- datanodeStateMachineMock);
+ datanodeStateMachineMock, "");
InetSocketAddress scm1 = new InetSocketAddress("scm1", 9001);
ctx.addEndpoint(scm1);
InetSocketAddress scm2 = new InetSocketAddress("scm2", 9001);
@@ -678,7 +678,7 @@ public class TestStateContext {
StorageContainerDatanodeProtocolProtos
.ContainerReportsProto.getDefaultInstance());
return new StateContext(conf, DatanodeStates.getInitState(),
- datanodeStateMachineMock);
+ datanodeStateMachineMock, "");
}
private static SCMCommand<?> someCommand() {
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
index d70c7af0d3..d286016946 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
@@ -70,7 +70,7 @@ public class TestCloseContainerCommandHandler {
private ContainerController controller;
private ContainerSet containerSet;
private CloseContainerCommandHandler subject =
- new CloseContainerCommandHandler(1, 1000);
+ new CloseContainerCommandHandler(1, 1000, "");
private final ContainerLayoutVersion layout;
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java
index a33341b1f9..6cadaae192 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java
@@ -124,7 +124,7 @@ public class TestDeleteBlocksCommandHandler {
conf.getObject(DatanodeConfiguration.class);
handler = spy(new DeleteBlocksCommandHandler(
- ozoneContainer, conf, dnConf));
+ ozoneContainer, conf, dnConf, ""));
blockDeleteMetrics = handler.getBlockDeleteMetrics();
TestSchemaHandler testSchemaHandler1 = Mockito.spy(new
TestSchemaHandler());
TestSchemaHandler testSchemaHandler2 = Mockito.spy(new
TestSchemaHandler());
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java
index a1d2685147..06002009c5 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java
@@ -168,7 +168,7 @@ public class TestDeleteContainerCommandHandler {
private static DeleteContainerCommandHandler createSubjectWithPoolSize(
TestClock clock, int queueSize) {
- return new DeleteContainerCommandHandler(1, clock, queueSize);
+ return new DeleteContainerCommandHandler(1, clock, queueSize, "");
}
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/datanode/TestRunningDatanodeState.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/datanode/TestRunningDatanodeState.java
index 2ac6dca922..76b4cd907c 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/datanode/TestRunningDatanodeState.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/datanode/TestRunningDatanodeState.java
@@ -57,7 +57,7 @@ public class TestRunningDatanodeState {
state.setExecutorCompletionService(ecs);
for (int i = 0; i < threadPoolSize; i++) {
- stateMachines.add(new EndpointStateMachine(null, null, null));
+ stateMachines.add(new EndpointStateMachine(null, null, null, ""));
}
CompletableFuture<EndpointStateMachine.EndPointStates> futureOne =
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
index a9a933d4c9..d0b9e80f3c 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
@@ -95,7 +95,7 @@ public class TestHeartbeatEndpointTask {
DatanodeStateMachine datanodeStateMachine =
Mockito.mock(DatanodeStateMachine.class);
StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
- datanodeStateMachine);
+ datanodeStateMachine, "");
// WHEN
HeartbeatEndpointTask task = getHeartbeatEndpointTask(conf, context, scm);
@@ -125,7 +125,7 @@ public class TestHeartbeatEndpointTask {
OzoneConfiguration conf = new OzoneConfiguration();
StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
- Mockito.mock(DatanodeStateMachine.class));
+ Mockito.mock(DatanodeStateMachine.class), "");
context.setTermOfLeaderSCM(1);
HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
conf, context, scm);
@@ -145,7 +145,7 @@ public class TestHeartbeatEndpointTask {
public void testheartbeatWithNodeReports() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
- Mockito.mock(DatanodeStateMachine.class));
+ Mockito.mock(DatanodeStateMachine.class), "");
StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
Mockito.mock(
@@ -177,7 +177,7 @@ public class TestHeartbeatEndpointTask {
public void testheartbeatWithContainerReports() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
- Mockito.mock(DatanodeStateMachine.class));
+ Mockito.mock(DatanodeStateMachine.class), "");
StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
Mockito.mock(
@@ -209,7 +209,7 @@ public class TestHeartbeatEndpointTask {
public void testheartbeatWithCommandStatusReports() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
- Mockito.mock(DatanodeStateMachine.class));
+ Mockito.mock(DatanodeStateMachine.class), "");
StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
Mockito.mock(
@@ -242,7 +242,7 @@ public class TestHeartbeatEndpointTask {
public void testheartbeatWithContainerActions() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
- Mockito.mock(DatanodeStateMachine.class));
+ Mockito.mock(DatanodeStateMachine.class), "");
StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
Mockito.mock(
@@ -276,7 +276,7 @@ public class TestHeartbeatEndpointTask {
DatanodeStateMachine datanodeStateMachine =
Mockito.mock(DatanodeStateMachine.class);
StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
- datanodeStateMachine);
+ datanodeStateMachine, "");
// Return a Map of command counts when the heartbeat logic requests it
final Map<SCMCommandProto.Type, Integer> commands = new HashMap<>();
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestPeriodicVolumeChecker.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestPeriodicVolumeChecker.java
index d77dda76ee..1465f954d5 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestPeriodicVolumeChecker.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestPeriodicVolumeChecker.java
@@ -89,7 +89,8 @@ public class TestPeriodicVolumeChecker {
FakeTimer timer = new FakeTimer();
- StorageVolumeChecker volumeChecker = new StorageVolumeChecker(conf, timer);
+ StorageVolumeChecker volumeChecker = new StorageVolumeChecker(conf, timer,
+ "");
try {
volumeChecker.registerVolumeSet(new ImmutableVolumeSet(makeVolumes(
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeChecker.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeChecker.java
index 423cea0ea8..29804e1e7e 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeChecker.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeChecker.java
@@ -156,7 +156,7 @@ public class TestStorageVolumeChecker {
LOG.info("Executing {}", testName.getMethodName());
final HddsVolume volume = makeVolumes(1, expectedVolumeHealth).get(0);
final StorageVolumeChecker checker =
- new StorageVolumeChecker(new OzoneConfiguration(), new FakeTimer());
+ new StorageVolumeChecker(new OzoneConfiguration(), new FakeTimer(),
"");
checker.setDelegateChecker(new DummyChecker());
final AtomicLong numCallbackInvocations = new AtomicLong(0);
@@ -200,7 +200,7 @@ public class TestStorageVolumeChecker {
final List<HddsVolume> volumes = makeVolumes(
NUM_VOLUMES, expectedVolumeHealth);
final StorageVolumeChecker checker =
- new StorageVolumeChecker(new OzoneConfiguration(), new FakeTimer());
+ new StorageVolumeChecker(new OzoneConfiguration(), new FakeTimer(),
"");
checker.setDelegateChecker(new DummyChecker());
Set<? extends StorageVolume> failedVolumes =
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSetDiskChecks.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSetDiskChecks.java
index bddbbb4143..973d2ae6ae 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSetDiskChecks.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSetDiskChecks.java
@@ -325,7 +325,7 @@ public class TestVolumeSetDiskChecks {
StateContext stateContext = new StateContext(
new OzoneConfiguration(), DatanodeStateMachine
.DatanodeStates.getInitState(),
- datanodeStateMachineMock);
+ datanodeStateMachineMock, "");
InetSocketAddress scm1 = new InetSocketAddress("scm1", 9001);
stateContext.addEndpoint(scm1);
when(datanodeStateMachineMock.getContainer()).thenReturn(ozoneContainer);
@@ -373,7 +373,7 @@ public class TestVolumeSetDiskChecks {
DummyChecker(ConfigurationSource conf, Timer timer, int numBadVolumes)
throws DiskErrorException {
- super(conf, timer);
+ super(conf, timer, "");
this.numBadVolumes = numBadVolumes;
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
index 739a4f5965..0065245674 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
@@ -131,7 +131,7 @@ public class TestReplicationSupervisor {
context = new StateContext(
new OzoneConfiguration(),
DatanodeStateMachine.DatanodeStates.getInitState(),
- stateMachine);
+ stateMachine, "");
context.setTermOfLeaderSCM(CURRENT_TERM);
datanode = MockDatanodeDetails.randomDatanodeDetails();
Mockito.when(stateMachine.getDatanodeDetails()).thenReturn(datanode);
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
index c202b46ca2..9f3d9c93a8 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
@@ -575,7 +575,7 @@ public class TestEndPoint {
final StateContext stateContext =
new StateContext(conf, DatanodeStateMachine.DatanodeStates.RUNNING,
- stateMachine);
+ stateMachine, "");
HeartbeatEndpointTask endpointTask =
new HeartbeatEndpointTask(rpcEndPoint, conf, stateContext,
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
index fa3e440259..88222c56a6 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
@@ -266,15 +266,10 @@ public class TestContainerCommandsEC {
scm.getPipelineManager().closePipeline(orphanPipeline, false);
// Find the datanode hosting Replica index = 2
- DatanodeDetails dn2 = null;
HddsDatanodeService dn2Service = null;
List<DatanodeDetails> pipelineNodes = orphanPipeline.getNodes();
- for (DatanodeDetails node : pipelineNodes) {
- if (orphanPipeline.getReplicaIndex(node) == 2) {
- dn2 = node;
- break;
- }
- }
+ DatanodeDetails dn2 = findDatanodeWithIndex(
+ orphanPipeline, pipelineNodes, 2);
// Find the Cluster node corresponding to the datanode hosting index = 2
for (HddsDatanodeService dn : cluster.getHddsDatanodes()) {
if (dn.getDatanodeDetails().equals(dn2)) {
@@ -363,8 +358,9 @@ public class TestContainerCommandsEC {
}
try (ECReconstructionCoordinator coordinator =
- new ECReconstructionCoordinator(config, certClient, secretKeyClient,
- null, ECReconstructionMetrics.create())) {
+ new ECReconstructionCoordinator(config, certClient,
+ secretKeyClient, null,
+ ECReconstructionMetrics.create(), "")) {
// Attempt to reconstruct the container.
coordinator.reconstructECContainerGroup(orphanContainerID,
@@ -390,6 +386,16 @@ public class TestContainerCommandsEC {
}
}
+ private DatanodeDetails findDatanodeWithIndex(Pipeline orphanPipeline,
+ List<DatanodeDetails> pipelineNodes, int index) {
+ for (DatanodeDetails node : pipelineNodes) {
+ if (orphanPipeline.getReplicaIndex(node) == index) {
+ return node;
+ }
+ }
+ return null;
+ }
+
@Test
public void testListBlock() throws Exception {
for (int i = 0; i < datanodeDetails.size(); i++) {
@@ -581,7 +587,7 @@ public class TestContainerCommandsEC {
new XceiverClientManager(config);
ECReconstructionCoordinator coordinator =
new ECReconstructionCoordinator(config, certClient,
secretKeyClient,
- null, ECReconstructionMetrics.create())) {
+ null, ECReconstructionMetrics.create(), "2")) {
ECReconstructionMetrics metrics =
coordinator.getECReconstructionMetrics();
@@ -776,8 +782,9 @@ public class TestContainerCommandsEC {
Assert.assertThrows(IOException.class, () -> {
try (ECReconstructionCoordinator coordinator =
- new ECReconstructionCoordinator(config, certClient, secretKeyClient,
- null, ECReconstructionMetrics.create())) {
+ new ECReconstructionCoordinator(config, certClient,
+ secretKeyClient,
+ null, ECReconstructionMetrics.create(), "")) {
coordinator.reconstructECContainerGroup(conID,
(ECReplicationConfig) containerPipeline.getReplicationConfig(),
sourceNodeMap, targetNodeMap);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]