This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch region_migration
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/region_migration by this push:
new e6d0e4d8ed7 Split configuration and add resetPeerList for region
migration (#12101)
e6d0e4d8ed7 is described below
commit e6d0e4d8ed7378d7be58dd8e90b02c400161a1ce
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Tue Mar 5 14:40:51 2024 +0800
Split configuration and add resetPeerList for region migration (#12101)
---
.../org/apache/iotdb/consensus/IConsensus.java | 13 +++
.../apache/iotdb/consensus/iot/IoTConsensus.java | 27 ++++++
.../consensus/iot/IoTConsensusServerImpl.java | 97 +++++++++++++++-------
.../iotdb/consensus/ratis/RatisConsensus.java | 37 +++++++++
.../iotdb/consensus/simple/SimpleConsensus.java | 6 ++
5 files changed, 151 insertions(+), 29 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
index 4ee908a7af8..dc3942ca071 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
@@ -143,6 +143,19 @@ public interface IConsensus {
*/
void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws
ConsensusException;
+ /**
+ * Reset the peer list of the corresponding consensus group. Currently only
used in the automatic
+ * cleanup of region migration as a rollback for {@link
#addRemotePeer(ConsensusGroupId, Peer)},
+ * so it will only be less but not more.
+ *
+ * @param groupId the consensus group
+ * @param peers the new peer list
+ * @return reset result
+ * @throws ConsensusException when resetPeerList doesn't success with other
reasons
+ * @throws ConsensusGroupNotExistException when the specified consensus
group doesn't exist
+ */
+ TSStatus resetPeerList(ConsensusGroupId groupId, List<Peer> peers) throws
ConsensusException;
+
// management API
/**
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index 94e116882e0..fd42f127244 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -393,6 +393,33 @@ public class IoTConsensus implements IConsensus {
return new ArrayList<>(stateMachineMap.keySet());
}
+ public TSStatus resetPeerList(ConsensusGroupId groupId, List<Peer> peers)
+ throws ConsensusException {
+ IoTConsensusServerImpl impl =
+ Optional.ofNullable(stateMachineMap.get(groupId))
+ .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+ if (impl.isReadOnly()) {
+ return StatusUtils.getStatus(TSStatusCode.SYSTEM_READ_ONLY);
+ } else if (!impl.isActive()) {
+ return RpcUtils.getStatus(
+ TSStatusCode.WRITE_PROCESS_REJECT,
+ "peer is inactive and not ready to receive reset configuration
request.");
+ } else {
+ for (Peer peer : impl.getConfiguration()) {
+ if (!peers.contains(peer)) {
+ try {
+ removeRemotePeer(groupId, peer);
+ } catch (ConsensusException e) {
+ logger.error("Failed to remove peer {} from group {}", peer,
groupId, e);
+ return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR,
e.getMessage());
+ }
+ }
+ }
+ impl.resetConfiguration(peers);
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+ }
+
public IoTConsensusServerImpl getImpl(ConsensusGroupId groupId) {
return stateMachineMap.get(groupId);
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 800754ddba6..6fd4bebae16 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -605,25 +605,13 @@ public class IoTConsensusServerImpl {
public void persistConfigurationUpdate() throws
ConsensusGroupModifyPeerException {
try {
serializeConfigurationAndFsyncToDisk(CONFIGURATION_TMP_FILE_NAME);
- Path tmpConfigurationPath =
- Paths.get(new File(storageDir,
CONFIGURATION_TMP_FILE_NAME).getAbsolutePath());
- Path configurationPath =
- Paths.get(new File(storageDir,
CONFIGURATION_FILE_NAME).getAbsolutePath());
- Files.deleteIfExists(configurationPath);
- Files.move(tmpConfigurationPath, configurationPath);
+ tmpConfigurationUpdate(configuration);
} catch (IOException e) {
throw new ConsensusGroupModifyPeerException(
"Unexpected error occurs when update configuration", e);
}
}
- private void serializeConfigurationTo(DataOutputStream outputStream) throws
IOException {
- outputStream.writeInt(configuration.size());
- for (Peer peer : configuration) {
- peer.serialize(outputStream);
- }
- }
-
public void recoverConfiguration() {
ByteBuffer buffer;
try {
@@ -635,15 +623,25 @@ public class IoTConsensusServerImpl {
// interrupted
// unexpectedly, we need substitute configuration with tmpConfiguration
file
if (Files.exists(tmpConfigurationPath)) {
- if (Files.exists(configurationPath)) {
- Files.delete(configurationPath);
- }
+ Files.deleteIfExists(configurationPath);
Files.move(tmpConfigurationPath, configurationPath);
}
- buffer = ByteBuffer.wrap(Files.readAllBytes(configurationPath));
- int size = buffer.getInt();
- for (int i = 0; i < size; i++) {
- configuration.add(Peer.deserialize(buffer));
+ if (Files.exists(configurationPath)) {
+ // recover from old configuration file
+ buffer = ByteBuffer.wrap(Files.readAllBytes(configurationPath));
+ int size = buffer.getInt();
+ for (int i = 0; i < size; i++) {
+ configuration.add(Peer.deserialize(buffer));
+ }
+ Files.delete(configurationPath);
+ persistConfiguration();
+ } else {
+ // recover from split configuration file
+ Path dirPath = Paths.get(storageDir);
+ List<Peer> tmpPeerList = getConfiguration(dirPath,
CONFIGURATION_TMP_FILE_NAME);
+ tmpConfigurationUpdate(tmpPeerList);
+ List<Peer> peerList = getConfiguration(dirPath,
CONFIGURATION_FILE_NAME);
+ configuration.addAll(peerList);
}
logger.info("Recover IoTConsensus server Impl, configuration: {}",
configuration);
} catch (IOException e) {
@@ -651,6 +649,42 @@ public class IoTConsensusServerImpl {
}
}
+ private void tmpConfigurationUpdate(List<Peer> tmpPeerList) throws
IOException {
+ for (Peer peer : tmpPeerList) {
+ Path tmpConfigurationPath =
+ Paths.get(
+ new File(storageDir, peer.getNodeId() + "_" +
CONFIGURATION_TMP_FILE_NAME)
+ .getAbsolutePath());
+ Path configurationPath =
+ Paths.get(
+ new File(storageDir, peer.getNodeId() + "_" +
CONFIGURATION_FILE_NAME)
+ .getAbsolutePath());
+ Files.deleteIfExists(configurationPath);
+ Files.move(tmpConfigurationPath, configurationPath);
+ }
+ }
+
+ private List<Peer> getConfiguration(Path dirPath, String
configurationFileName)
+ throws IOException {
+ ByteBuffer buffer;
+ List<Peer> tmpConfiguration = new ArrayList<>();
+ Path[] files =
+ Files.walk(dirPath)
+ .filter(Files::isRegularFile)
+ .filter(filePath ->
filePath.getFileName().toString().contains(configurationFileName))
+ .toArray(Path[]::new);
+ for (Path file : files) {
+ buffer = ByteBuffer.wrap(Files.readAllBytes(file));
+ tmpConfiguration.add(Peer.deserialize(buffer));
+ }
+ return tmpConfiguration;
+ }
+
+ public void resetConfiguration(List<Peer> newConfiguration) {
+ configuration.clear();
+ configuration.addAll(newConfiguration);
+ }
+
public IndexedConsensusRequest buildIndexedConsensusRequestForLocalRequest(
IConsensusRequest request) {
if (request instanceof ComparableConsensusRequest) {
@@ -841,15 +875,20 @@ public class IoTConsensusServerImpl {
private void serializeConfigurationAndFsyncToDisk(String
configurationFileName)
throws IOException {
- FileOutputStream fileOutputStream =
- new FileOutputStream(new File(storageDir, configurationFileName));
- DataOutputStream outputStream = new DataOutputStream(fileOutputStream);
- try {
- serializeConfigurationTo(outputStream);
- } finally {
- fileOutputStream.flush();
- fileOutputStream.getFD().sync();
- outputStream.close();
+ for (Peer peer : configuration) {
+ String peerConfigurationFileName = peer.getNodeId() + "_" +
configurationFileName;
+ FileOutputStream fileOutputStream =
+ new FileOutputStream(new File(storageDir,
peerConfigurationFileName));
+ try (DataOutputStream outputStream = new
DataOutputStream(fileOutputStream)) {
+ peer.serialize(outputStream);
+ } finally {
+ try {
+ fileOutputStream.flush();
+ fileOutputStream.getFD().sync();
+ } catch (IOException e) {
+ logger.error("Failed to fsync the configuration file {}",
peerConfigurationFileName, e);
+ }
+ }
}
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index c627692b72d..86b77e16c8e 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -51,6 +51,7 @@ import
org.apache.iotdb.consensus.ratis.metrics.RatisMetricsManager;
import org.apache.iotdb.consensus.ratis.utils.Retriable;
import org.apache.iotdb.consensus.ratis.utils.RetryPolicy;
import org.apache.iotdb.consensus.ratis.utils.Utils;
+import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.commons.pool2.KeyedObjectPool;
@@ -537,6 +538,42 @@ class RatisConsensus implements IConsensus {
sendReconfiguration(RaftGroup.valueOf(raftGroupId, newConfig));
}
+ @Override
+ public TSStatus resetPeerList(ConsensusGroupId groupId, List<Peer> peers)
+ throws ConsensusException {
+ RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
+ RaftGroup group = getGroupInfo(raftGroupId);
+
+ // pre-conditions: group exists and myself in this group
+ if (group == null || !group.getPeers().contains(myself)) {
+ throw new ConsensusGroupNotExistException(groupId);
+ }
+
+ TSStatus writeResult = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ for (Peer peer : peers) {
+ RaftPeer peerToRemove = Utils.fromPeerAndPriorityToRaftPeer(peer,
DEFAULT_PRIORITY);
+ // pre-condition: peer is a member of groupId
+ if (!group.getPeers().contains(peerToRemove)) {
+ throw new PeerAlreadyInConsensusGroupException(groupId, peer);
+ }
+ // update group peer information
+ List<RaftPeer> newConfig =
+ group.getPeers().stream()
+ .filter(raftPeer -> !raftPeer.equals(peerToRemove))
+ .collect(Collectors.toList());
+ RaftClientReply reply =
sendReconfiguration(RaftGroup.valueOf(raftGroupId, newConfig));
+ if (!reply.isSuccess()) {
+ throw new RatisRequestFailedException(reply.getException());
+ }
+ try {
+ writeResult =
Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer());
+ } catch (Exception e) {
+ throw new RatisRequestFailedException(e);
+ }
+ }
+ return writeResult;
+ }
+
/** NOTICE: transferLeader *does not guarantee* the leader be transferred to
newLeader. */
@Override
public void transferLeader(ConsensusGroupId groupId, Peer newLeader) throws
ConsensusException {
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java
index 59f6cccd6f8..d9c0aca85de 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java
@@ -241,6 +241,12 @@ class SimpleConsensus implements IConsensus {
return new ArrayList<>(stateMachineMap.keySet());
}
+ @Override
+ public TSStatus resetPeerList(ConsensusGroupId groupId, List<Peer> peers)
+ throws ConsensusException {
+ throw new ConsensusException("SimpleConsensus does not support reset peer
list");
+ }
+
private String buildPeerDir(ConsensusGroupId groupId) {
return storageDir + File.separator + groupId.getType().getValue() + "_" +
groupId.getId();
}