This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d7188c12285 IGNITE-26000 Implement method to get on-disk group IDs in
Raft meta storage (#6308)
d7188c12285 is described below
commit d7188c12285e0a0df29f049a1e2e5ce2f599db05
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Jul 25 11:46:33 2025 +0400
IGNITE-26000 Implement method to get on-disk group IDs in Raft meta storage
(#6308)
---
.../apache/ignite/internal/raft/RaftNodeId.java | 22 ++++++++-
.../ignite/internal/raft/StoredRaftNodeId.java | 57 ++++++++++++++++++++++
...tDestructorTest.java => ItJraftServerTest.java} | 56 ++++++++++++++++-----
.../ignite/raft/server/JraftAbstractTest.java | 2 +-
.../server/impl/GroupStoragesContextResolver.java | 10 ++++
.../internal/raft/server/impl/JraftServerImpl.java | 40 ++++++++++++++-
6 files changed, 172 insertions(+), 15 deletions(-)
diff --git
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftNodeId.java
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftNodeId.java
index 65d3eeab06c..68daad75054 100644
---
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftNodeId.java
+++
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftNodeId.java
@@ -25,6 +25,8 @@ import org.apache.ignite.internal.tostring.S;
* Raft node identifier, consists of a Raft group ID and a Peer ID.
*/
public class RaftNodeId {
+ private static final String PEER_INDEX_DELIMITER = "-";
+
private final ReplicationGroupId groupId;
private final Peer peer;
@@ -42,9 +44,27 @@ public class RaftNodeId {
/**
* Returns string which represents this Raft node ID when used in file
paths and RocksDB prefixes.
+ *
+ * @see #fromNodeIdStringForStorage(String, String)
*/
public String nodeIdStringForStorage() {
- return groupId().toString() + "-" + peer().idx();
+ return groupId().toString() + PEER_INDEX_DELIMITER + peer().idx();
+ }
+
+ /**
+ * Parses string represenation of {@link RaftNodeId} and returns it as
{@link StoredRaftNodeId}.
+ *
+ * @param nodeIdStr String to parse (it's produced by {@link
#nodeIdStringForStorage()}).
+ * @param localNodeConsistentId Name of the local node.
+ * @see #nodeIdStringForStorage()
+ */
+ public static StoredRaftNodeId fromNodeIdStringForStorage(String
nodeIdStr, String localNodeConsistentId) {
+ int separatorIndex = nodeIdStr.lastIndexOf(PEER_INDEX_DELIMITER);
+
+ int peerIndex = Integer.parseInt(nodeIdStr.substring(separatorIndex +
PEER_INDEX_DELIMITER.length()));
+ Peer peer = new Peer(localNodeConsistentId, peerIndex);
+
+ return new StoredRaftNodeId(nodeIdStr.substring(0, separatorIndex),
peer);
}
/**
diff --git
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/StoredRaftNodeId.java
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/StoredRaftNodeId.java
new file mode 100644
index 00000000000..27c904281c2
--- /dev/null
+++
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/StoredRaftNodeId.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft;
+
+import java.util.Objects;
+
+/**
+ * Stored Raft node ID. It holds the subset of information contained in {@link
RaftNodeId} that can be
+ *
+ * @see RaftNodeId
+ */
+public class StoredRaftNodeId {
+ private final String groupIdName;
+ private final Peer peer;
+
+ public StoredRaftNodeId(String groupIdName, Peer peer) {
+ this.groupIdName = groupIdName;
+ this.peer = peer;
+ }
+
+ public String groupIdName() {
+ return groupIdName;
+ }
+
+ public Peer peer() {
+ return peer;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ StoredRaftNodeId that = (StoredRaftNodeId) o;
+ return Objects.equals(groupIdName, that.groupIdName) &&
Objects.equals(peer, that.peer);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(groupIdName, peer);
+ }
+}
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftDestructorTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftServerTest.java
similarity index 69%
rename from
modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftDestructorTest.java
rename to
modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftServerTest.java
index bff16da9c70..bac5d111223 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftDestructorTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftServerTest.java
@@ -18,10 +18,13 @@
package org.apache.ignite.raft.server;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -33,17 +36,31 @@ import java.util.Objects;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.StoredRaftNodeId;
import org.apache.ignite.internal.raft.server.RaftGroupOptions;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.raft.service.RaftGroupListener;
import org.apache.ignite.internal.raft.storage.LogStorageFactory;
import org.apache.ignite.internal.raft.storage.impl.LogStorageException;
import org.apache.ignite.internal.replicator.TestReplicationGroupId;
+import org.apache.ignite.raft.jraft.core.TestCluster;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-/** Tests that check that failed storage destruction is finished after server
restart. */
-public class ItJraftDestructorTest extends JraftAbstractTest {
+class ItJraftServerTest extends JraftAbstractTest {
private static final int SERVER_INDEX = 0;
+ private JraftServerImpl server;
+ private Path serverDataPath;
+
+ private final TestReplicationGroupId testReplicationGroupId = new
TestReplicationGroupId("test");
+
+ @BeforeEach
+ void setUp() {
+ server = startServer(SERVER_INDEX);
+ serverDataPath = serverWorkingDirs.get(SERVER_INDEX).metaPath();
+ }
+
@Test
void testFinishStorageDestructionAfterRestart() throws Exception {
doTestFinishStorageDestructionAfterRestart(false);
@@ -61,10 +78,7 @@ public class ItJraftDestructorTest extends JraftAbstractTest
{
}
private void doTestFinishStorageDestructionAfterRestart(boolean
isVolatile) throws Exception {
- JraftServerImpl server = startServer(SERVER_INDEX);
- Path serverDataPath = serverWorkingDirs.get(SERVER_INDEX).basePath();
-
- RaftNodeId nodeId = getRaftNodeId(server);
+ RaftNodeId nodeId = testGroupRaftNodeId();
Path nodeDataPath = createServerDataPathForNode(serverDataPath,
nodeId);
@@ -73,7 +87,7 @@ public class ItJraftDestructorTest extends JraftAbstractTest {
LogStorageFactory logStorageFactory =
logStorageFactories.get(SERVER_INDEX);
doThrow(LogStorageException.class).doCallRealMethod().when(logStorageFactory).destroyLogStorage(anyString());
- RaftGroupOptions groupOptions = getRaftGroupOptions(isVolatile,
logStorageFactory, serverDataPath);
+ RaftGroupOptions groupOptions = getRaftGroupOptions(isVolatile,
logStorageFactory);
assertThrows(
IgniteInternalException.class,
@@ -93,11 +107,17 @@ public class ItJraftDestructorTest extends
JraftAbstractTest {
assertFalse(Files.exists(nodeDataPath));
}
- private RaftNodeId getRaftNodeId(JraftServerImpl server) {
+ private RaftNodeId testGroupRaftNodeId() {
+ return new RaftNodeId(testReplicationGroupId, localPeer());
+ }
+
+ private Peer localPeer() {
String localNodeName =
server.clusterService().topologyService().localMember().name();
- Peer peer =
Objects.requireNonNull(initialMembersConf.peer(localNodeName));
+ return localPeer(localNodeName);
+ }
- return new RaftNodeId(new TestReplicationGroupId("test"), peer);
+ private Peer localPeer(String localNodeName) {
+ return Objects.requireNonNull(initialMembersConf.peer(localNodeName));
}
private static Path createServerDataPathForNode(Path serverDataPath,
RaftNodeId nodeId) throws IOException {
@@ -108,13 +128,25 @@ public class ItJraftDestructorTest extends
JraftAbstractTest {
return nodeDataPath;
}
- private static RaftGroupOptions getRaftGroupOptions(boolean isVolatile,
LogStorageFactory logStorageFactory, Path serverDataPath) {
+ private RaftGroupOptions getRaftGroupOptions(boolean isVolatile,
LogStorageFactory logStorageFactory) {
RaftGroupOptions groupOptions = isVolatile ?
RaftGroupOptions.forVolatileStores() : RaftGroupOptions.forPersistentStores();
-
groupOptions.setLogStorageFactory(logStorageFactory).serverDataPath(serverDataPath);
+ groupOptions.setLogStorageFactory(logStorageFactory);
+ groupOptions.serverDataPath(serverDataPath);
+
groupOptions.commandsMarshaller(TestCluster.commandsMarshaller(server.clusterService()));
return groupOptions;
}
private JraftServerImpl startServer(int index) {
return startServer(index, x -> {}, opts -> {});
}
+
+ @Test
+ void listsGroupIdsOnDisk() {
+ RaftNodeId nodeId = testGroupRaftNodeId();
+
+ RaftGroupOptions groupOptions = getRaftGroupOptions(false,
logStorageFactories.get(SERVER_INDEX));
+ assertTrue(server.startRaftNode(nodeId, initialMembersConf,
mock(RaftGroupListener.class), groupOptions));
+
+ assertThat(server.raftNodeIdsOnDisk(), contains(new
StoredRaftNodeId(nodeId.groupId().toString(), nodeId.peer())));
+ }
}
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/JraftAbstractTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/JraftAbstractTest.java
index ee64dfc0de4..fd92816458b 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/JraftAbstractTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/JraftAbstractTest.java
@@ -233,7 +233,7 @@ public abstract class JraftAbstractTest extends
RaftServerAbstractTest {
GroupStoragesContextResolver groupStoragesContextResolver = new
GroupStoragesContextResolver(
replicationGroupId -> groupName,
- Map.of(groupName, workingDir.basePath()),
+ Map.of(groupName, workingDir.metaPath()),
Map.of(groupName, partitionsLogStorageFactory)
);
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/GroupStoragesContextResolver.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/GroupStoragesContextResolver.java
index 1f98e7c92a2..2568d3ab62b 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/GroupStoragesContextResolver.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/GroupStoragesContextResolver.java
@@ -18,6 +18,8 @@
package org.apache.ignite.internal.raft.server.impl;
import java.nio.file.Path;
+import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.apache.ignite.internal.raft.RaftNodeId;
@@ -53,4 +55,12 @@ public class GroupStoragesContextResolver {
StorageDestructionIntent getIntent(RaftNodeId nodeId, boolean isVolatile) {
return new StorageDestructionIntent(nodeId.nodeIdStringForStorage(),
groupNameResolver.apply(nodeId.groupId()), isVolatile);
}
+
+ Collection<Path> serverDataPaths() {
+ return List.copyOf(serverDataPathByGroupName.values());
+ }
+
+ Collection<LogStorageFactory> logStorageFactories() {
+ return List.copyOf(logStorageFactoryByGroupName.values());
+ }
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index e0979745a53..40beb1680a0 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -21,10 +21,12 @@ import static
java.util.concurrent.CompletableFuture.runAsync;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toUnmodifiableList;
+import static java.util.stream.Collectors.toUnmodifiableSet;
import static
org.apache.ignite.internal.thread.ThreadOperation.PROCESS_RAFT_REQ;
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import java.io.File;
import java.io.IOException;
@@ -47,6 +49,7 @@ import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
import java.util.stream.IntStream;
+import java.util.stream.Stream;
import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.failure.FailureManager;
import org.apache.ignite.internal.failure.FailureType;
@@ -65,6 +68,7 @@ import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
import org.apache.ignite.internal.raft.RaftGroupEventsListener;
import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.StoredRaftNodeId;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.raft.server.RaftGroupOptions;
import org.apache.ignite.internal.raft.server.RaftServer;
@@ -621,12 +625,46 @@ public class JraftServerImpl implements RaftServer {
// This destroys both meta storage and snapshots storage as they
are stored under nodeDataPath.
IgniteUtils.deleteIfExistsThrowable(dataPath);
} catch (Exception e) {
- throw new IgniteInternalException("Failed to delete storage for
node: " + nodeId, e);
+ throw new IgniteInternalException(INTERNAL_ERR, "Failed to delete
storage for node: " + nodeId, e);
}
groupStoragesDestructionIntents.removeStorageDestructionIntent(nodeId);
}
+ /**
+ * Returns Raft node IDs for all groups that are present on disk.
+ *
+ * <p>This method should only be called when no Raft nodes are started or
being started.
+ */
+ public Set<StoredRaftNodeId> raftNodeIdsOnDisk() {
+ Set<String> groupIdsForStorage = new HashSet<>();
+
+ for (LogStorageFactory logStorageFactory :
groupStoragesContextResolver.logStorageFactories()) {
+
groupIdsForStorage.addAll(logStorageFactory.raftNodeStorageIdsOnDisk());
+ }
+ groupIdsForStorage.addAll(raftNodeMetaStorageIdsOnDisk());
+
+ return groupIdsForStorage.stream()
+ .map(nodeIdStr ->
RaftNodeId.fromNodeIdStringForStorage(nodeIdStr, service.nodeName()))
+ .collect(toUnmodifiableSet());
+ }
+
+ private Set<String> raftNodeMetaStorageIdsOnDisk() {
+ return groupStoragesContextResolver.serverDataPaths().stream()
+ .flatMap(JraftServerImpl::listFiles)
+ .filter(Files::isDirectory)
+ .map(groupDirPath -> groupDirPath.getFileName().toString())
+ .collect(toUnmodifiableSet());
+ }
+
+ private static Stream<Path> listFiles(Path dir) {
+ try {
+ return Files.list(dir);
+ } catch (IOException e) {
+ throw new IgniteInternalException(INTERNAL_ERR, e);
+ }
+ }
+
@Override
public @Nullable IndexWithTerm raftNodeIndex(RaftNodeId nodeId) {
RaftGroupService service = nodes.get(nodeId);