This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch mergemaster0808 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit bc94e5aee534b1910396f422ff6df5b4dd6a3b74 Author: Christofer Dutz <[email protected]> AuthorDate: Tue Jul 30 07:17:48 2024 +0200 Enhance remove-datanode function (cherry picked from commit 04ba236ef64a370bfefb762182a9c0b1363f8fee) --- .../main/java/org/apache/iotdb/rpc/UrlUtils.java | 9 +- .../thrift/ConfigNodeRPCServiceHandler.java | 8 +- .../thrift/ConfigNodeRPCServiceProcessor.java | 48 +++-- .../thrift/ConfigNodeRPCServiceProcessorTest.java | 164 ++++++++++++++++ .../db/service/DataNodeServerCommandLine.java | 145 ++++++++------ .../db/service/DataNodeServerCommandLineTest.java | 218 +++++++++++++++++++++ 6 files changed, 512 insertions(+), 80 deletions(-) diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/UrlUtils.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/UrlUtils.java index a7994a8a520..39b2390a736 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/UrlUtils.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/UrlUtils.java @@ -23,7 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; /** The UrlUtils */ public class UrlUtils { - private static final String POINT_COLON = ":"; + private static final String PORT_SEPARATOR = ":"; private static final String ABB_COLON = "["; private UrlUtils() {} @@ -37,10 +37,11 @@ public class UrlUtils { */ public static TEndPoint parseTEndPointIpv4AndIpv6Url(String endPointUrl) { TEndPoint endPoint = new TEndPoint(); - if (endPointUrl.contains(POINT_COLON)) { - int point_position = endPointUrl.lastIndexOf(POINT_COLON); - String port = endPointUrl.substring(endPointUrl.lastIndexOf(POINT_COLON) + 1); + if (endPointUrl.contains(PORT_SEPARATOR)) { + int point_position = endPointUrl.lastIndexOf(PORT_SEPARATOR); + String port = endPointUrl.substring(endPointUrl.lastIndexOf(PORT_SEPARATOR) + 1); String ip = endPointUrl.substring(0, point_position); + // If the ip/host part is provided as IPv6 address, cut off the surrounding square brackets. if (ip.contains(ABB_COLON)) { ip = ip.substring(1, ip.length() - 1); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceHandler.java index f9892f2ae62..abe4c03f861 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceHandler.java @@ -27,7 +27,7 @@ import org.apache.thrift.transport.TTransport; import java.util.concurrent.atomic.AtomicLong; public class ConfigNodeRPCServiceHandler implements TServerEventHandler { - private AtomicLong thriftConnectionNumber = new AtomicLong(0); + private final AtomicLong thriftConnectionNumber = new AtomicLong(0); public ConfigNodeRPCServiceHandler() { MetricService.getInstance() @@ -35,13 +35,13 @@ public class ConfigNodeRPCServiceHandler implements TServerEventHandler { } @Override - public ServerContext createContext(TProtocol arg0, TProtocol arg1) { + public ServerContext createContext(TProtocol input, TProtocol output) { thriftConnectionNumber.incrementAndGet(); return null; } @Override - public void deleteContext(ServerContext arg0, TProtocol arg1, TProtocol arg2) { + public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output) { thriftConnectionNumber.decrementAndGet(); } @@ -51,7 +51,7 @@ public class ConfigNodeRPCServiceHandler implements TServerEventHandler { } @Override - public void processContext(ServerContext arg0, TTransport arg1, TTransport arg2) { + public void processContext(ServerContext serverContext, TTransport input, TTransport output) { // nothing } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java index 5597610980f..fd7db9ce3ca 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java @@ -32,6 +32,7 @@ import org.apache.iotdb.common.rpc.thrift.TSetThrottleQuotaReq; import org.apache.iotdb.common.rpc.thrift.TShowConfigurationResp; import org.apache.iotdb.common.rpc.thrift.TShowTTLReq; import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp; +import org.apache.iotdb.commons.conf.CommonConfig; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.path.PartialPath; @@ -200,14 +201,26 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac private static final Logger LOGGER = LoggerFactory.getLogger(ConfigNodeRPCServiceProcessor.class); - private static final ConfigNodeConfig CONFIG_NODE_CONFIG = - ConfigNodeDescriptor.getInstance().getConf(); - - protected ConfigManager configManager; - - protected ConfigNodeRPCServiceProcessor() {} + protected final CommonConfig commonConfig; + protected final ConfigNodeConfig configNodeConfig; + protected final ConfigNode configNode; + protected final ConfigManager configManager; public ConfigNodeRPCServiceProcessor(ConfigManager configManager) { + this.commonConfig = CommonDescriptor.getInstance().getConfig(); + this.configNodeConfig = ConfigNodeDescriptor.getInstance().getConf(); + this.configNode = ConfigNode.getInstance(); + this.configManager = configManager; + } + + public ConfigNodeRPCServiceProcessor( + CommonConfig commonConfig, + ConfigNodeConfig configNodeConfig, + ConfigNode configNode, + ConfigManager configManager) { + this.commonConfig = commonConfig; + this.configNodeConfig = configNodeConfig; + this.configNode = configNode; this.configManager = configManager; } @@ -320,7 +333,7 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac if (isSystemDatabase) { databaseSchema.setSchemaReplicationFactor(1); } else if (!databaseSchema.isSetSchemaReplicationFactor()) { - databaseSchema.setSchemaReplicationFactor(CONFIG_NODE_CONFIG.getSchemaReplicationFactor()); + databaseSchema.setSchemaReplicationFactor(configNodeConfig.getSchemaReplicationFactor()); } else if (databaseSchema.getSchemaReplicationFactor() <= 0) { errorResp = new TSStatus(TSStatusCode.DATABASE_CONFIG_ERROR.getStatusCode()) @@ -331,7 +344,7 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac if (isSystemDatabase) { databaseSchema.setDataReplicationFactor(1); } else if (!databaseSchema.isSetDataReplicationFactor()) { - databaseSchema.setDataReplicationFactor(CONFIG_NODE_CONFIG.getDataReplicationFactor()); + databaseSchema.setDataReplicationFactor(configNodeConfig.getDataReplicationFactor()); } else if (databaseSchema.getDataReplicationFactor() <= 0) { errorResp = new TSStatus(TSStatusCode.DATABASE_CONFIG_ERROR.getStatusCode()) @@ -340,8 +353,7 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac } if (!databaseSchema.isSetTimePartitionOrigin()) { - databaseSchema.setTimePartitionOrigin( - CommonDescriptor.getInstance().getConfig().getTimePartitionOrigin()); + databaseSchema.setTimePartitionOrigin(commonConfig.getTimePartitionOrigin()); } else if (databaseSchema.getTimePartitionOrigin() < 0) { errorResp = new TSStatus(TSStatusCode.DATABASE_CONFIG_ERROR.getStatusCode()) @@ -350,8 +362,7 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac } if (!databaseSchema.isSetTimePartitionInterval()) { - databaseSchema.setTimePartitionInterval( - CommonDescriptor.getInstance().getConfig().getTimePartitionInterval()); + databaseSchema.setTimePartitionInterval(commonConfig.getTimePartitionInterval()); } else if (databaseSchema.getTimePartitionInterval() <= 0) { errorResp = new TSStatus(TSStatusCode.DATABASE_CONFIG_ERROR.getStatusCode()) @@ -363,7 +374,7 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac databaseSchema.setMinSchemaRegionGroupNum(1); } else if (!databaseSchema.isSetMinSchemaRegionGroupNum()) { databaseSchema.setMinSchemaRegionGroupNum( - CONFIG_NODE_CONFIG.getDefaultSchemaRegionGroupNumPerDatabase()); + configNodeConfig.getDefaultSchemaRegionGroupNumPerDatabase()); } else if (databaseSchema.getMinSchemaRegionGroupNum() <= 0) { errorResp = new TSStatus(TSStatusCode.DATABASE_CONFIG_ERROR.getStatusCode()) @@ -375,7 +386,7 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac databaseSchema.setMinDataRegionGroupNum(1); } else if (!databaseSchema.isSetMinDataRegionGroupNum()) { databaseSchema.setMinDataRegionGroupNum( - CONFIG_NODE_CONFIG.getDefaultDataRegionGroupNumPerDatabase()); + configNodeConfig.getDefaultDataRegionGroupNumPerDatabase()); } else if (databaseSchema.getMinDataRegionGroupNum() <= 0) { errorResp = new TSStatus(TSStatusCode.DATABASE_CONFIG_ERROR.getStatusCode()) @@ -690,7 +701,7 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac LOGGER.info( "{} has successfully started and joined the cluster: {}.", ConfigNodeConstant.GLOBAL_NAME, - ConfigNodeDescriptor.getInstance().getConf().getClusterName()); + configNodeConfig.getClusterName()); return StatusUtils.OK; } @@ -735,6 +746,11 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac @Override public TSStatus stopConfigNode(TConfigNodeLocation configNodeLocation) { new Thread( + // TODO: Perhaps we should find some other way of shutting down the config node, adding + // a hard dependency + // in order to do this feels a bit odd. Dispatching a shutdown event which is processed + // where the + // instance is created feels cleaner. () -> { try { // Sleep 1s before stop itself @@ -743,7 +759,7 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac Thread.currentThread().interrupt(); LOGGER.warn(e.getMessage()); } finally { - ConfigNode.getInstance().stop(); + configNode.stop(); } }) .start(); diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java new file mode 100644 index 00000000000..c4d993b5a79 --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.confignode.service.thrift; + +import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.conf.CommonConfig; +import org.apache.iotdb.confignode.conf.ConfigNodeConfig; +import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeRegisterResp; +import org.apache.iotdb.confignode.manager.ConfigManager; +import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq; +import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp; +import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq; +import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp; +import org.apache.iotdb.confignode.rpc.thrift.TRuntimeConfiguration; +import org.apache.iotdb.confignode.service.ConfigNode; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.rpc.TimeoutChangeableTFastFramedTransport; + +import junit.framework.TestCase; +import org.apache.thrift.transport.TSocket; +import org.junit.Assert; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import java.net.InetAddress; +import java.net.Socket; +import java.util.Collections; + +public class ConfigNodeRPCServiceProcessorTest extends TestCase { + + /** + * This test should be a normal data-node registration where a valid ip is used as address of the + * rpc-service. Nothing special should happen here. + * + * @throws Exception nothing should go wrong here. + */ + public void testRegisterDataNode() throws Exception { + // Set up the system under test. + CommonConfig commonConfig = Mockito.mock(CommonConfig.class); + ConfigNodeConfig configNodeConfig = Mockito.mock(ConfigNodeConfig.class); + ConfigNode configNode = Mockito.mock(ConfigNode.class); + ConfigManager configManager = Mockito.mock(ConfigManager.class); + DataNodeRegisterResp registerDataNodeResponse = new DataNodeRegisterResp(); + registerDataNodeResponse.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); + registerDataNodeResponse.setConfigNodeList( + Collections.singletonList(new TConfigNodeLocation())); + registerDataNodeResponse.setDataNodeId(42); + registerDataNodeResponse.setRuntimeConfiguration(new TRuntimeConfiguration()); + Mockito.when(configManager.registerDataNode(Mockito.any(TDataNodeRegisterReq.class))) + .thenReturn(registerDataNodeResponse); + Socket socket = Mockito.mock(Socket.class); + Mockito.when(socket.getInetAddress()) + .thenReturn(InetAddress.getByAddress(new byte[] {1, 2, 3, 4})); + TSocket tSocket = Mockito.mock(TSocket.class); + Mockito.when(tSocket.getSocket()).thenReturn(socket); + TimeoutChangeableTFastFramedTransport transport = + Mockito.mock(TimeoutChangeableTFastFramedTransport.class); + Mockito.when(transport.getSocket()).thenReturn(tSocket); + ConfigNodeRPCServiceProcessor sut = + new ConfigNodeRPCServiceProcessor( + commonConfig, configNodeConfig, configNode, configManager); + + // Prepare the test input + TDataNodeLocation newDataNodeLocation = new TDataNodeLocation(); + newDataNodeLocation.setDataNodeId(42); + newDataNodeLocation.setClientRpcEndPoint(new TEndPoint("1.2.3.4", 6667)); + TDataNodeConfiguration newDataNodeConfiguration = new TDataNodeConfiguration(); + newDataNodeConfiguration.setLocation(newDataNodeLocation); + TDataNodeRegisterReq req = new TDataNodeRegisterReq(); + req.setClusterName("test-cluster"); + req.setDataNodeConfiguration(newDataNodeConfiguration); + + // Execute the test logic + TDataNodeRegisterResp res = sut.registerDataNode(req); + + // Check the result + Assert.assertEquals(registerDataNodeResponse.convertToRpcDataNodeRegisterResp(), res); + // Check that the config manager was called to register a new node + ArgumentCaptor<TDataNodeRegisterReq> acRequest = + ArgumentCaptor.forClass(TDataNodeRegisterReq.class); + Mockito.verify(configManager, Mockito.times(1)).registerDataNode(acRequest.capture()); + TDataNodeRegisterReq sentRequest = acRequest.getValue(); + Assert.assertEquals( + "1.2.3.4", + sentRequest.getDataNodeConfiguration().getLocation().getClientRpcEndPoint().getIp()); + } + + /** + * This test should be a normal data-node restart where a valid ip is used as address of the + * rpc-service. Nothing special should happen here. + * + * @throws Exception nothing should go wrong here. + */ + public void testRestartDataNode() throws Exception { + // Set up the system under test. + CommonConfig commonConfig = Mockito.mock(CommonConfig.class); + ConfigNodeConfig configNodeConfig = Mockito.mock(ConfigNodeConfig.class); + ConfigNode configNode = Mockito.mock(ConfigNode.class); + ConfigManager configManager = Mockito.mock(ConfigManager.class); + TDataNodeRestartResp restartDataNodeResponse = new TDataNodeRestartResp(); + restartDataNodeResponse.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); + restartDataNodeResponse.setConfigNodeList(Collections.singletonList(new TConfigNodeLocation())); + restartDataNodeResponse.setRuntimeConfiguration(new TRuntimeConfiguration()); + Mockito.when(configManager.restartDataNode(Mockito.any(TDataNodeRestartReq.class))) + .thenReturn(restartDataNodeResponse); + Socket socket = Mockito.mock(Socket.class); + Mockito.when(socket.getInetAddress()) + .thenReturn(InetAddress.getByAddress(new byte[] {1, 2, 3, 4})); + TSocket tSocket = Mockito.mock(TSocket.class); + Mockito.when(tSocket.getSocket()).thenReturn(socket); + TimeoutChangeableTFastFramedTransport transport = + Mockito.mock(TimeoutChangeableTFastFramedTransport.class); + Mockito.when(transport.getSocket()).thenReturn(tSocket); + ConfigNodeRPCServiceProcessor sut = + new ConfigNodeRPCServiceProcessor( + commonConfig, configNodeConfig, configNode, configManager); + + // Prepare the test input + TDataNodeLocation newDataNodeLocation = new TDataNodeLocation(); + newDataNodeLocation.setDataNodeId(42); + newDataNodeLocation.setClientRpcEndPoint(new TEndPoint("1.2.3.4", 6667)); + TDataNodeConfiguration newDataNodeConfiguration = new TDataNodeConfiguration(); + newDataNodeConfiguration.setLocation(newDataNodeLocation); + TDataNodeRestartReq req = new TDataNodeRestartReq(); + req.setClusterName("test-cluster"); + req.setDataNodeConfiguration(newDataNodeConfiguration); + + // Execute the test logic + TDataNodeRestartResp res = sut.restartDataNode(req); + + // Check the result + Assert.assertEquals(restartDataNodeResponse, res); + // Check that the config manager was called to register a new node + ArgumentCaptor<TDataNodeRestartReq> acRequest = + ArgumentCaptor.forClass(TDataNodeRestartReq.class); + Mockito.verify(configManager, Mockito.times(1)).restartDataNode(acRequest.capture()); + TDataNodeRestartReq sentRequest = acRequest.getValue(); + // In this case we expect the ConfigNodeRPCServiceProcessor to have replaced the + // ip of "0.0.0.0" with the IP it got the request from. + Assert.assertEquals( + "1.2.3.4", + sentRequest.getDataNodeConfiguration().getLocation().getClientRpcEndPoint().getIp()); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java index d6760111032..92f55888d83 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java @@ -20,12 +20,12 @@ package org.apache.iotdb.db.service; import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; -import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.ServerCommandLine; +import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.exception.ClientManagerException; +import org.apache.iotdb.commons.consensus.ConfigRegionId; import org.apache.iotdb.commons.exception.BadNodeUrlException; import org.apache.iotdb.commons.exception.IoTDBException; -import org.apache.iotdb.commons.utils.NodeUrlUtils; import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq; import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveResp; import org.apache.iotdb.db.protocol.client.ConfigNodeClient; @@ -33,6 +33,7 @@ import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.commons.lang3.math.NumberUtils; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,8 +42,6 @@ import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; -import static org.apache.commons.lang3.StringUtils.isNumeric; - public class DataNodeServerCommandLine extends ServerCommandLine { private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeServerCommandLine.class); @@ -53,12 +52,39 @@ public class DataNodeServerCommandLine extends ServerCommandLine { // metaport-of-removed-node public static final String MODE_REMOVE = "-r"; + private final ConfigNodeInfo configNodeInfo; + private final IClientManager<ConfigRegionId, ConfigNodeClient> configNodeClientManager; + private final DataNode dataNode; + private static final String USAGE = "Usage: <-s|-r> " + "[-D{} <configure folder>] \n" + "-s: start the node to the cluster\n" + "-r: remove the node out of the cluster\n"; + /** Default constructor using the singletons for initializing the relationship. */ + public DataNodeServerCommandLine() { + configNodeInfo = ConfigNodeInfo.getInstance(); + configNodeClientManager = ConfigNodeClientManager.getInstance(); + dataNode = DataNode.getInstance(); + } + + /** + * Additional constructor allowing injection of custom instances (mainly for testing) + * + * @param configNodeInfo config node info + * @param configNodeClientManager config node client manager + * @param dataNode data node + */ + public DataNodeServerCommandLine( + ConfigNodeInfo configNodeInfo, + IClientManager<ConfigRegionId, ConfigNodeClient> configNodeClientManager, + DataNode dataNode) { + this.configNodeInfo = configNodeInfo; + this.configNodeClientManager = configNodeClientManager; + this.dataNode = dataNode; + } + @Override protected String getUsage() { return USAGE; @@ -71,8 +97,6 @@ public class DataNodeServerCommandLine extends ServerCommandLine { return -1; } - DataNode dataNode = DataNode.getInstance(); - String mode = args[0]; LOGGER.info("Running mode {}", mode); @@ -80,7 +104,7 @@ public class DataNodeServerCommandLine extends ServerCommandLine { if (MODE_START.equals(mode)) { dataNode.doAddNode(); } else if (MODE_REMOVE.equals(mode)) { - doRemoveDataNode(args); + return doRemoveDataNode(args); } else { LOGGER.error("Unrecognized mode {}", mode); } @@ -88,22 +112,25 @@ public class DataNodeServerCommandLine extends ServerCommandLine { } /** - * remove datanodes from cluster + * remove data-nodes from cluster * * @param args id or ip:rpc_port for removed datanode */ - private void doRemoveDataNode(String[] args) + private int doRemoveDataNode(String[] args) throws BadNodeUrlException, TException, IoTDBException, ClientManagerException { if (args.length != 2) { LOGGER.info("Usage: <node-id>/<ip>:<rpc-port>"); - return; + return -1; } + // REMARK: Don't need null or empty-checks for args[0] or args[1], as if they were + // empty, the JVM would have not received them. + LOGGER.info("Starting to remove DataNode from cluster, parameter: {}, {}", args[0], args[1]); // Load ConfigNodeList from system.properties file - ConfigNodeInfo.getInstance().loadConfigNodeList(); + configNodeInfo.loadConfigNodeList(); List<TDataNodeLocation> dataNodeLocations = buildDataNodeLocations(args[1]); if (dataNodeLocations.isEmpty()) { @@ -112,7 +139,7 @@ public class DataNodeServerCommandLine extends ServerCommandLine { LOGGER.info("Start to remove datanode, removed datanode endpoints: {}", dataNodeLocations); TDataNodeRemoveReq removeReq = new TDataNodeRemoveReq(dataNodeLocations); try (ConfigNodeClient configNodeClient = - ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { TDataNodeRemoveResp removeResp = configNodeClient.removeDataNode(removeReq); LOGGER.info("Remove result {} ", removeResp); if (removeResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { @@ -125,6 +152,7 @@ public class DataNodeServerCommandLine extends ServerCommandLine { + "and after the process of removing datanode ends successfully, " + "you are supposed to delete directory and data of the removed-datanode manually"); } + return 0; } /** @@ -135,57 +163,62 @@ public class DataNodeServerCommandLine extends ServerCommandLine { */ private List<TDataNodeLocation> buildDataNodeLocations(String args) { List<TDataNodeLocation> dataNodeLocations = new ArrayList<>(); - if (args == null || args.trim().isEmpty()) { - return dataNodeLocations; - } // Now support only single datanode deletion if (args.split(",").length > 1) { - LOGGER.info("Incorrect input format, usage: <id>/<ip>:<rpc-port>"); - return dataNodeLocations; + throw new IllegalArgumentException("Currently only removing single nodes is supported."); } // Below supports multiple datanode deletion, split by ',', and is reserved for extension - try { - List<TEndPoint> endPoints = NodeUrlUtils.parseTEndPointUrls(args); - try (ConfigNodeClient client = - ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { - dataNodeLocations = - client.getDataNodeConfiguration(-1).getDataNodeConfigurationMap().values().stream() - .map(TDataNodeConfiguration::getLocation) - .filter(location -> endPoints.contains(location.getClientRpcEndPoint())) - .collect(Collectors.toList()); - } catch (TException | ClientManagerException e) { - LOGGER.error("Get data node locations failed", e); - } - } catch (BadNodeUrlException e) { - try (ConfigNodeClient client = - ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { - for (String id : args.split(",")) { - if (!isNumeric(id)) { - LOGGER.warn("Incorrect id format {}, skipped...", id); - continue; - } - List<TDataNodeLocation> nodeLocationResult = - client - .getDataNodeConfiguration(Integer.parseInt(id)) - .getDataNodeConfigurationMap() - .values() - .stream() - .map(TDataNodeConfiguration::getLocation) - .collect(Collectors.toList()); - if (nodeLocationResult.isEmpty()) { - LOGGER.warn("DataNode {} is not in cluster, skipped...", id); - continue; - } - if (!dataNodeLocations.contains(nodeLocationResult.get(0))) { - dataNodeLocations.add(nodeLocationResult.get(0)); - } - } - } catch (TException | ClientManagerException e1) { - LOGGER.error("Get data node locations failed", e); - } + List<NodeCoordinate> nodeCoordinates = parseCoordinates(args); + try (ConfigNodeClient client = + configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + dataNodeLocations = + client.getDataNodeConfiguration(-1).getDataNodeConfigurationMap().values().stream() + .map(TDataNodeConfiguration::getLocation) + .filter( + location -> + nodeCoordinates.stream() + .anyMatch(nodeCoordinate -> nodeCoordinate.matches(location))) + .collect(Collectors.toList()); + } catch (TException | ClientManagerException e) { + LOGGER.error("Get data node locations failed", e); } + return dataNodeLocations; } + + protected List<NodeCoordinate> parseCoordinates(String coordinatesString) { + // Multiple nodeIds are separated by "," + String[] nodeIdStrings = coordinatesString.split(","); + List<NodeCoordinate> nodeIdCoordinates = new ArrayList<>(nodeIdStrings.length); + for (String nodeId : nodeIdStrings) { + // In the other case, we expect it to be a numeric value referring to the node-id + if (NumberUtils.isCreatable(nodeId)) { + nodeIdCoordinates.add(new NodeCoordinateNodeId(Integer.parseInt(nodeId))); + } else { + LOGGER.error("Invalid format. Expected a numeric node id, but got: {}", nodeId); + } + } + return nodeIdCoordinates; + } + + protected interface NodeCoordinate { + // Returns true if the given location matches this coordinate + boolean matches(TDataNodeLocation location); + } + + /** Implementation of a NodeCoordinate that uses the node id to match. */ + protected static class NodeCoordinateNodeId implements NodeCoordinate { + private final int nodeId; + + public NodeCoordinateNodeId(int nodeId) { + this.nodeId = nodeId; + } + + @Override + public boolean matches(TDataNodeLocation location) { + return location.isSetDataNodeId() && location.dataNodeId == nodeId; + } + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeServerCommandLineTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeServerCommandLineTest.java new file mode 100644 index 00000000000..ab37e6bc64a --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeServerCommandLineTest.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.service; + +import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TNodeResource; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.consensus.ConfigRegionId; +import org.apache.iotdb.commons.exception.BadNodeUrlException; +import org.apache.iotdb.confignode.rpc.thrift.TDataNodeConfigurationResp; +import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq; +import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveResp; +import org.apache.iotdb.db.protocol.client.ConfigNodeClient; +import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; +import org.apache.iotdb.rpc.TSStatusCode; + +import junit.framework.TestCase; +import org.junit.Assert; +import org.mockito.Mockito; + +import java.util.Arrays; +import java.util.Collections; + +public class DataNodeServerCommandLineTest extends TestCase { + + // List of well known locations for this test + protected static final TDataNodeLocation LOCATION_1 = + new TDataNodeLocation(1, new TEndPoint("1.2.3.4", 6667), null, null, null, null); + protected static final TDataNodeLocation LOCATION_2 = + new TDataNodeLocation(2, new TEndPoint("1.2.3.5", 6667), null, null, null, null); + protected static final TDataNodeLocation LOCATION_3 = + new TDataNodeLocation(3, new TEndPoint("1.2.3.6", 6667), null, null, null, null); + // An invalid location + protected static final TDataNodeLocation INVALID_LOCATION = + new TDataNodeLocation(23, new TEndPoint("4.3.2.1", 815), null, null, null, null); + + /** + * In this test we pass an empty args list to the command. This is expected to fail. + * + * @throws Exception nothing should go wrong here. + */ + public void testNoArgs() throws Exception { + // No need to initialize these mocks with anything sensible, as they should never be used. + ConfigNodeInfo configNodeInfo = null; + IClientManager<ConfigRegionId, ConfigNodeClient> configNodeClientManager = null; + DataNode dataNode = null; + DataNodeServerCommandLine sut = + new DataNodeServerCommandLine(configNodeInfo, configNodeClientManager, dataNode); + + int returnCode = sut.run(new String[0]); + + // We expect an error code of -1. + Assert.assertEquals(-1, returnCode); + } + + /** + * In this test we pass too many arguments to the command. This should also fail with an error + * code. + * + * @throws Exception nothing should go wrong here. + */ + public void testTooManyArgs() throws Exception { + // No need to initialize these mocks with anything sensible, as they should never be used. + ConfigNodeInfo configNodeInfo = null; + IClientManager<ConfigRegionId, ConfigNodeClient> configNodeClientManager = null; + DataNode dataNode = null; + DataNodeServerCommandLine sut = + new DataNodeServerCommandLine(configNodeInfo, configNodeClientManager, dataNode); + + int returnCode = sut.run(new String[] {"-r", "2", "-s"}); + + // We expect an error code of -1. + Assert.assertEquals(-1, returnCode); + } + + /** + * In this test case we provide the coordinates for the data-node that we want to delete by + * providing the node-id of that node. + * + * @throws Exception nothing should go wrong here. + */ + public void testSingleDataNodeRemoveById() throws Exception { + ConfigNodeInfo configNodeInfo = Mockito.mock(ConfigNodeInfo.class); + IClientManager<ConfigRegionId, ConfigNodeClient> configNodeClientManager = + Mockito.mock(IClientManager.class); + ConfigNodeClient client = Mockito.mock(ConfigNodeClient.class); + Mockito.when(configNodeClientManager.borrowClient(Mockito.any(ConfigRegionId.class))) + .thenReturn(client); + // This is the result of the getDataNodeConfiguration, which contains the list of known data + // nodes. + TDataNodeConfigurationResp tDataNodeConfigurationResp = new TDataNodeConfigurationResp(); + tDataNodeConfigurationResp.putToDataNodeConfigurationMap( + 1, new TDataNodeConfiguration(LOCATION_1, new TNodeResource())); + tDataNodeConfigurationResp.putToDataNodeConfigurationMap( + 2, new TDataNodeConfiguration(LOCATION_2, new TNodeResource())); + tDataNodeConfigurationResp.putToDataNodeConfigurationMap( + 3, new TDataNodeConfiguration(LOCATION_3, new TNodeResource())); + Mockito.when(client.getDataNodeConfiguration(Mockito.anyInt())) + .thenReturn(tDataNodeConfigurationResp); + // Only return something sensible, if exactly this location is asked to be deleted. + Mockito.when( + client.removeDataNode(new TDataNodeRemoveReq(Collections.singletonList(LOCATION_2)))) + .thenReturn( + new TDataNodeRemoveResp(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()))); + DataNode dataNode = Mockito.mock(DataNode.class); + DataNodeServerCommandLine sut = + new DataNodeServerCommandLine(configNodeInfo, configNodeClientManager, dataNode); + + int returnCode = sut.run(new String[] {"-r", "2"}); + + // Check the overall return code was ok. + Assert.assertEquals(0, returnCode); + // Check that the config node client was actually called with a request to remove the + // node we want it to remove + Mockito.verify(client, Mockito.times(1)) + .removeDataNode(new TDataNodeRemoveReq(Collections.singletonList(LOCATION_2))); + } + + /** + * In this test case we provide the coordinates for the data-node that we want to delete by + * providing the node-id of that node. However, the coordinates are invalid and therefore the + * deletion fails with an error. + * + * @throws Exception nothing should go wrong here. + */ + public void testSingleDataNodeRemoveByIdWithInvalidCoordinates() throws Exception { + ConfigNodeInfo configNodeInfo = Mockito.mock(ConfigNodeInfo.class); + IClientManager<ConfigRegionId, ConfigNodeClient> configNodeClientManager = + Mockito.mock(IClientManager.class); + ConfigNodeClient client = Mockito.mock(ConfigNodeClient.class); + Mockito.when(configNodeClientManager.borrowClient(Mockito.any(ConfigRegionId.class))) + .thenReturn(client); + // This is the result of the getDataNodeConfiguration, which contains the list of known data + // nodes. + TDataNodeConfigurationResp tDataNodeConfigurationResp = new TDataNodeConfigurationResp(); + tDataNodeConfigurationResp.putToDataNodeConfigurationMap( + 1, new TDataNodeConfiguration(LOCATION_1, new TNodeResource())); + tDataNodeConfigurationResp.putToDataNodeConfigurationMap( + 2, new TDataNodeConfiguration(LOCATION_2, new TNodeResource())); + tDataNodeConfigurationResp.putToDataNodeConfigurationMap( + 3, new TDataNodeConfiguration(LOCATION_3, new TNodeResource())); + Mockito.when(client.getDataNodeConfiguration(Mockito.anyInt())) + .thenReturn(tDataNodeConfigurationResp); + DataNode dataNode = Mockito.mock(DataNode.class); + DataNodeServerCommandLine sut = + new DataNodeServerCommandLine(configNodeInfo, configNodeClientManager, dataNode); + + try { + sut.run(new String[] {"-r", "23"}); + Assert.fail("This call should have failed"); + } catch (Exception e) { + // This is actually what we expected + Assert.assertTrue(e instanceof BadNodeUrlException); + } + } + + /** + * In this test case we provide the coordinates for the data-node that we want to delete by + * providing the node-id of that node. NOTE: The test was prepared to test deletion of multiple + * nodes, however currently we don't support this. + * + * @throws Exception nothing should go wrong here. + */ + public void testMultipleDataNodeRemoveById() throws Exception { + ConfigNodeInfo configNodeInfo = Mockito.mock(ConfigNodeInfo.class); + IClientManager<ConfigRegionId, ConfigNodeClient> configNodeClientManager = + Mockito.mock(IClientManager.class); + ConfigNodeClient client = Mockito.mock(ConfigNodeClient.class); + Mockito.when(configNodeClientManager.borrowClient(Mockito.any(ConfigRegionId.class))) + .thenReturn(client); + // This is the result of the getDataNodeConfiguration, which contains the list of known data + // nodes. + TDataNodeConfigurationResp tDataNodeConfigurationResp = new TDataNodeConfigurationResp(); + tDataNodeConfigurationResp.putToDataNodeConfigurationMap( + 1, new TDataNodeConfiguration(LOCATION_1, new TNodeResource())); + tDataNodeConfigurationResp.putToDataNodeConfigurationMap( + 2, new TDataNodeConfiguration(LOCATION_2, new TNodeResource())); + tDataNodeConfigurationResp.putToDataNodeConfigurationMap( + 3, new TDataNodeConfiguration(LOCATION_3, new TNodeResource())); + Mockito.when(client.getDataNodeConfiguration(Mockito.anyInt())) + .thenReturn(tDataNodeConfigurationResp); + // Only return something sensible, if exactly the locations we want are asked to be deleted. + Mockito.when( + client.removeDataNode(new TDataNodeRemoveReq(Arrays.asList(LOCATION_1, LOCATION_2)))) + .thenReturn( + new TDataNodeRemoveResp(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()))); + DataNode dataNode = Mockito.mock(DataNode.class); + DataNodeServerCommandLine sut = + new DataNodeServerCommandLine(configNodeInfo, configNodeClientManager, dataNode); + + try { + sut.run(new String[] {"-r", "1,2"}); + Assert.fail("This call should have failed"); + } catch (Exception e) { + // This is actually what we expected + Assert.assertTrue(e instanceof IllegalArgumentException); + } + } +}
