This is an automated email from the ASF dual-hosted git repository. ibessonov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 4d2976dd25 IGNITE-18374 Remove RaftManager#prepareRaftGroup method (#1438) 4d2976dd25 is described below commit 4d2976dd25787d2ee047672ee4c94b297244de31 Author: Alexander Polovtcev <alex.polovt...@gmail.com> AuthorDate: Wed Dec 14 13:05:23 2022 +0300 IGNITE-18374 Remove RaftManager#prepareRaftGroup method (#1438) --- .../management/raft/ItCmgRaftServiceTest.java | 27 ++++++--- .../management/ClusterManagementGroupManager.java | 20 ++++--- .../client/ItMetaStorageServiceTest.java | 27 ++++++--- .../internal/metastorage/MetaStorageManager.java | 44 ++++++++------ .../apache/ignite/internal/raft/RaftManager.java | 41 +++---------- .../ignite/internal/raft/ItLearnersTest.java | 10 ++-- .../apache/ignite/internal/raft/ItLozaTest.java | 13 ++-- .../internal/raft/ItRaftGroupServiceTest.java | 14 ++--- .../java/org/apache/ignite/internal/raft/Loza.java | 70 ++++------------------ .../org/apache/ignite/internal/raft/LozaTest.java | 5 +- .../sql/engine/exec/MockedStructuresTest.java | 2 +- .../distributed/ItTxDistributedTestSingleNode.java | 11 ++-- 12 files changed, 120 insertions(+), 164 deletions(-) diff --git a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java index a17c26388d..8ee84e07b3 100644 --- a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java +++ b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java @@ -21,7 +21,6 @@ import static java.util.stream.Collectors.collectingAndThen; import static java.util.stream.Collectors.toSet; import static org.apache.ignite.internal.cluster.management.ClusterState.clusterState; import static org.apache.ignite.internal.cluster.management.ClusterTag.clusterTag; -import static org.apache.ignite.internal.cluster.management.CmgGroupId.INSTANCE; import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.will; @@ -43,6 +42,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.ignite.internal.cluster.management.ClusterState; import org.apache.ignite.internal.cluster.management.ClusterTag; +import org.apache.ignite.internal.cluster.management.CmgGroupId; import org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory; import org.apache.ignite.internal.cluster.management.raft.commands.JoinReadyCommand; import org.apache.ignite.internal.cluster.management.raft.commands.JoinRequestCommand; @@ -54,8 +54,11 @@ import org.apache.ignite.internal.configuration.testframework.InjectConfiguratio import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.properties.IgniteProductVersion; import org.apache.ignite.internal.raft.Loza; +import org.apache.ignite.internal.raft.Peer; import org.apache.ignite.internal.raft.PeersAndLearners; +import org.apache.ignite.internal.raft.RaftGroupEventsListener; import org.apache.ignite.internal.raft.RaftManager; +import org.apache.ignite.internal.raft.RaftNodeId; import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.raft.service.RaftGroupService; import org.apache.ignite.internal.testframework.WorkDirectory; @@ -117,12 +120,20 @@ public class ItCmgRaftServiceTest { .map(ClusterNode::name) .collect(collectingAndThen(toSet(), PeersAndLearners::fromConsistentIds)); - CompletableFuture<RaftGroupService> raftService = raftManager.prepareRaftGroup( - INSTANCE, - configuration.peer(clusterService.topologyService().localMember().name()), - configuration, - () -> new CmgRaftGroupListener(raftStorage, new LogicalTopologyImpl(raftStorage), term -> {}) - ); + Peer serverPeer = configuration.peer(localMember().name()); + + CompletableFuture<RaftGroupService> raftService; + + if (serverPeer == null) { + raftService = raftManager.startRaftGroupService(CmgGroupId.INSTANCE, configuration); + } else { + raftService = raftManager.startRaftGroupNode( + new RaftNodeId(CmgGroupId.INSTANCE, serverPeer), + configuration, + new CmgRaftGroupListener(raftStorage, new LogicalTopologyImpl(raftStorage), term -> {}), + RaftGroupEventsListener.noopLsnr + ); + } assertThat(raftService, willCompleteSuccessfully()); @@ -133,7 +144,7 @@ public class ItCmgRaftServiceTest { } void beforeNodeStop() throws NodeStoppingException { - raftManager.stopRaftNodes(INSTANCE); + raftManager.stopRaftNodes(CmgGroupId.INSTANCE); raftManager.beforeNodeStop(); clusterService.beforeNodeStop(); diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java index dcbae3f7aa..b46c9cbb99 100644 --- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java +++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java @@ -22,7 +22,6 @@ import static java.util.concurrent.CompletableFuture.failedFuture; import static java.util.stream.Collectors.toSet; import static java.util.stream.Collectors.toUnmodifiableSet; import static org.apache.ignite.internal.cluster.management.ClusterTag.clusterTag; -import static org.apache.ignite.internal.cluster.management.CmgGroupId.INSTANCE; import java.util.Collection; import java.util.List; @@ -55,9 +54,11 @@ import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.manager.IgniteComponent; import org.apache.ignite.internal.properties.IgniteProductVersion; +import org.apache.ignite.internal.raft.Peer; import org.apache.ignite.internal.raft.PeersAndLearners; import org.apache.ignite.internal.raft.RaftGroupEventsListener; import org.apache.ignite.internal.raft.RaftManager; +import org.apache.ignite.internal.raft.RaftNodeId; import org.apache.ignite.internal.raft.Status; import org.apache.ignite.internal.thread.NamedThreadFactory; import org.apache.ignite.internal.util.IgniteSpinBusyLock; @@ -404,7 +405,7 @@ public class ClusterManagementGroupManager implements IgniteComponent { raftService = null; } - raftManager.stopRaftNodes(INSTANCE); + raftManager.stopRaftNodes(CmgGroupId.INSTANCE); localStateStorage.clear().get(); } catch (Exception e) { @@ -506,14 +507,17 @@ public class ClusterManagementGroupManager implements IgniteComponent { PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(nodeNames, learnerNames); + Peer serverPeer = isLearner ? configuration.learner(thisNodeConsistentId) : configuration.peer(thisNodeConsistentId); + + assert serverPeer != null; + try { return raftManager - .prepareRaftGroup( - INSTANCE, - isLearner ? configuration.learner(thisNodeConsistentId) : configuration.peer(thisNodeConsistentId), + .startRaftGroupNode( + new RaftNodeId(CmgGroupId.INSTANCE, serverPeer), configuration, - () -> new CmgRaftGroupListener(clusterStateStorage, logicalTopology, this::onLogicalTopologyChanged), - this::createCmgRaftGroupEventsListener + new CmgRaftGroupListener(clusterStateStorage, logicalTopology, this::onLogicalTopologyChanged), + createCmgRaftGroupEventsListener() ) .thenApply(service -> new CmgRaftService(service, clusterService, logicalTopology)); } catch (Exception e) { @@ -667,7 +671,7 @@ public class ClusterManagementGroupManager implements IgniteComponent { IgniteUtils.shutdownAndAwaitTermination(scheduledExecutor, 10, TimeUnit.SECONDS); - raftManager.stopRaftNodes(INSTANCE); + raftManager.stopRaftNodes(CmgGroupId.INSTANCE); // Fail the future to unblock dependent operations joinFuture.completeExceptionally(new NodeStoppingException()); diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java index f766e0a971..313c49a08d 100644 --- a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java +++ b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java @@ -29,7 +29,6 @@ import static org.apache.ignite.internal.metastorage.client.ItMetaStorageService import static org.apache.ignite.internal.metastorage.client.Operations.ops; import static org.apache.ignite.internal.metastorage.client.Operations.put; import static org.apache.ignite.internal.metastorage.client.Operations.remove; -import static org.apache.ignite.internal.metastorage.common.MetastorageGroupId.INSTANCE; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.apache.ignite.utils.ClusterServiceTestUtils.findLocalAddresses; import static org.apache.ignite.utils.ClusterServiceTestUtils.waitForTopology; @@ -69,6 +68,7 @@ import java.util.stream.Stream; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.metastorage.common.MetastorageGroupId; import org.apache.ignite.internal.metastorage.common.OperationType; import org.apache.ignite.internal.metastorage.server.AbstractCompoundCondition; import org.apache.ignite.internal.metastorage.server.AbstractSimpleCondition; @@ -83,8 +83,11 @@ import org.apache.ignite.internal.metastorage.server.ValueCondition; import org.apache.ignite.internal.metastorage.server.ValueCondition.Type; import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener; import org.apache.ignite.internal.raft.Loza; +import org.apache.ignite.internal.raft.Peer; import org.apache.ignite.internal.raft.PeersAndLearners; +import org.apache.ignite.internal.raft.RaftGroupEventsListener; import org.apache.ignite.internal.raft.RaftManager; +import org.apache.ignite.internal.raft.RaftNodeId; import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.raft.service.RaftGroupService; import org.apache.ignite.internal.testframework.WorkDirectory; @@ -244,7 +247,8 @@ public class ItMetaStorageServiceTest { public void afterTest() throws Exception { Stream<AutoCloseable> stopRaftGroupServices = raftGroupServices.stream().map(service -> service::shutdown); - Stream<AutoCloseable> stopRaftGroups = raftManagers.stream().map(manager -> () -> manager.stopRaftNodes(INSTANCE)); + Stream<AutoCloseable> stopRaftGroups = raftManagers.stream() + .map(manager -> () -> manager.stopRaftNodes(MetastorageGroupId.INSTANCE)); Stream<AutoCloseable> beforeNodeStop = Stream.concat(raftManagers.stream(), cluster.stream()).map(c -> c::beforeNodeStop); @@ -906,7 +910,7 @@ public class ItMetaStorageServiceTest { PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(Set.of(localName)); RaftGroupService metaStorageRaftSvc2 = raftManagers.get(1) - .startRaftGroupService(INSTANCE, configuration) + .startRaftGroupService(MetastorageGroupId.INSTANCE, configuration) .get(3, TimeUnit.SECONDS); raftGroupServices.add(metaStorageRaftSvc2); @@ -1080,11 +1084,16 @@ public class ItMetaStorageServiceTest { raftManagers.add(raftManager); - return raftManager.prepareRaftGroup( - INSTANCE, - configuration.peer(node.topologyService().localMember().name()), - configuration, - () -> new MetaStorageListener(mockStorage) - ); + Peer serverPeer = configuration.peer(node.topologyService().localMember().name()); + + if (serverPeer == null) { + return raftManager.startRaftGroupService(MetastorageGroupId.INSTANCE, configuration); + } else { + var nodeId = new RaftNodeId(MetastorageGroupId.INSTANCE, serverPeer); + + return raftManager.startRaftGroupNode( + nodeId, configuration, new MetaStorageListener(mockStorage), RaftGroupEventsListener.noopLsnr + ); + } } } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java index 0e5f5ae9e0..fdb20aa7e1 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.metastorage; -import static org.apache.ignite.internal.metastorage.common.MetastorageGroupId.INSTANCE; import static org.apache.ignite.internal.util.ByteUtils.bytesToLong; import static org.apache.ignite.internal.util.ByteUtils.longToBytes; import static org.apache.ignite.lang.ErrorGroups.MetaStorage.CURSOR_CLOSING_ERR; @@ -45,6 +44,7 @@ import org.apache.ignite.internal.metastorage.client.OperationTimeoutException; import org.apache.ignite.internal.metastorage.client.StatementResult; import org.apache.ignite.internal.metastorage.client.WatchListener; import org.apache.ignite.internal.metastorage.common.MetaStorageException; +import org.apache.ignite.internal.metastorage.common.MetastorageGroupId; import org.apache.ignite.internal.metastorage.server.KeyValueStorage; import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener; import org.apache.ignite.internal.metastorage.watch.AggregatedWatch; @@ -52,7 +52,9 @@ import org.apache.ignite.internal.metastorage.watch.KeyCriterion; import org.apache.ignite.internal.metastorage.watch.WatchAggregator; import org.apache.ignite.internal.raft.Peer; import org.apache.ignite.internal.raft.PeersAndLearners; +import org.apache.ignite.internal.raft.RaftGroupEventsListener; import org.apache.ignite.internal.raft.RaftManager; +import org.apache.ignite.internal.raft.RaftNodeId; import org.apache.ignite.internal.raft.service.RaftGroupService; import org.apache.ignite.internal.util.Cursor; import org.apache.ignite.internal.util.IgniteSpinBusyLock; @@ -166,29 +168,33 @@ public class MetaStorageManager implements IgniteComponent { Peer localPeer = configuration.peer(thisNode.name()); - if (localPeer != null) { - clusterService.topologyService().addEventHandler(new TopologyEventHandler() { - @Override - public void onDisappeared(ClusterNode member) { - metaStorageSvcFut.thenAccept(svc -> svc.closeCursors(member.id())); - } - }); - - storage.start(); - } + CompletableFuture<RaftGroupService> raftServiceFuture; try { - CompletableFuture<RaftGroupService> raftServiceFuture = raftMgr.prepareRaftGroup( - INSTANCE, - localPeer, - configuration, - () -> new MetaStorageListener(storage) - ); + if (localPeer == null) { + raftServiceFuture = raftMgr.startRaftGroupService(MetastorageGroupId.INSTANCE, configuration); + } else { + clusterService.topologyService().addEventHandler(new TopologyEventHandler() { + @Override + public void onDisappeared(ClusterNode member) { + metaStorageSvcFut.thenAccept(svc -> svc.closeCursors(member.id())); + } + }); + + storage.start(); - return raftServiceFuture.thenApply(service -> new MetaStorageServiceImpl(service, thisNode.id(), thisNode.name())); + raftServiceFuture = raftMgr.startRaftGroupNode( + new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer), + configuration, + new MetaStorageListener(storage), + RaftGroupEventsListener.noopLsnr + ); + } } catch (NodeStoppingException e) { return CompletableFuture.failedFuture(e); } + + return raftServiceFuture.thenApply(service -> new MetaStorageServiceImpl(service, thisNode.id(), thisNode.name())); } /** {@inheritDoc} */ @@ -229,7 +235,7 @@ public class MetaStorageManager implements IgniteComponent { IgniteUtils.closeAll( this::stopDeployedWatches, () -> { - if (raftMgr.stopRaftNodes(INSTANCE)) { + if (raftMgr.stopRaftNodes(MetastorageGroupId.INSTANCE)) { storage.close(); } } diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java index 0d3cc0d37c..09446af8a5 100644 --- a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java +++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java @@ -18,55 +18,30 @@ package org.apache.ignite.internal.raft; import java.util.concurrent.CompletableFuture; -import java.util.function.Supplier; import org.apache.ignite.internal.manager.IgniteComponent; import org.apache.ignite.internal.raft.service.RaftGroupListener; import org.apache.ignite.internal.raft.service.RaftGroupService; import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.lang.NodeStoppingException; -import org.jetbrains.annotations.Nullable; /** * Raft manager. */ public interface RaftManager extends IgniteComponent { /** - * Optionally starts a Raft node and creates a Raft group service providing operations on a Raft group. + * Starts a Raft group and a Raft service on the current node. * - * @param groupId Raft group ID. - * @param serverPeer Local peer that will host the Raft node. If {@code null} - no nodes will be started, but only the Raft client - * service. - * @param configuration Peers and Learners of the Raft group. - * @param lsnrSupplier Raft group listener supplier. - * @return Future representing pending completion of the operation. - * @throws NodeStoppingException If node stopping intention was detected. - */ - // TODO: remove this method, see https://issues.apache.org/jira/browse/IGNITE-18374 - CompletableFuture<RaftGroupService> prepareRaftGroup( - ReplicationGroupId groupId, - @Nullable Peer serverPeer, - PeersAndLearners configuration, - Supplier<RaftGroupListener> lsnrSupplier - ) throws NodeStoppingException; - - /** - * Optionally starts a Raft node and creates a Raft group service providing operations on a Raft group. - * - * @param groupId Raft group ID. - * @param serverPeer Local peer that will host the Raft node. If {@code null} - no nodes will be started, but only the Raft client - * service. + * @param nodeId Raft node ID. * @param configuration Peers and Learners of the Raft group. - * @param lsnrSupplier Raft group listener supplier. - * @param raftGrpEvtsLsnrSupplier Raft group events listener supplier. - * @return Future representing pending completion of the operation. + * @param lsnr Raft group listener. + * @param eventsLsnr Raft group events listener. * @throws NodeStoppingException If node stopping intention was detected. */ - CompletableFuture<RaftGroupService> prepareRaftGroup( - ReplicationGroupId groupId, - @Nullable Peer serverPeer, + CompletableFuture<RaftGroupService> startRaftGroupNode( + RaftNodeId nodeId, PeersAndLearners configuration, - Supplier<RaftGroupListener> lsnrSupplier, - Supplier<RaftGroupEventsListener> raftGrpEvtsLsnrSupplier + RaftGroupListener lsnr, + RaftGroupEventsListener eventsLsnr ) throws NodeStoppingException; /** diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLearnersTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLearnersTest.java index 5852345397..b0e5fe49ba 100644 --- a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLearnersTest.java +++ b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLearnersTest.java @@ -224,7 +224,7 @@ public class ItLearnersTest extends IgniteAbstractTest { RaftNode learner1 = nodes.get(1); CompletableFuture<RaftGroupService> service2 = - startRaftGroup(learner1, configuration.learner(learner1.consistentId()), newConfiguration, new TestRaftGroupListener()); + startRaftGroup(learner1, newConfiguration.learner(learner1.consistentId()), newConfiguration, new TestRaftGroupListener()); // Check that learners and peers have been set correctly. Stream.of(service1, service2).forEach(service -> { @@ -401,11 +401,11 @@ public class ItLearnersTest extends IgniteAbstractTest { RaftGroupListener listener ) { try { - CompletableFuture<RaftGroupService> future = node.loza.prepareRaftGroup( - RAFT_GROUP_ID, - serverPeer, + CompletableFuture<RaftGroupService> future = node.loza.startRaftGroupNode( + new RaftNodeId(RAFT_GROUP_ID, serverPeer), configuration, - () -> listener + listener, + RaftGroupEventsListener.noopLsnr ); return future.thenApply(s -> { diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java index c9374ca0cd..fbcf3dcf3a 100644 --- a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java +++ b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java @@ -35,7 +35,6 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.hlc.HybridClockImpl; @@ -77,17 +76,15 @@ public class ItLozaTest { * @return Raft group service. */ private RaftGroupService startClient(TestReplicationGroupId groupId, ClusterNode node, Loza loza) throws Exception { - Supplier<RaftGroupListener> raftGroupListenerSupplier = () -> { - RaftGroupListener raftGroupListener = mock(RaftGroupListener.class); + RaftGroupListener raftGroupListener = mock(RaftGroupListener.class); - when(raftGroupListener.onSnapshotLoad(any())).thenReturn(true); - - return raftGroupListener; - }; + when(raftGroupListener.onSnapshotLoad(any())).thenReturn(true); PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(Set.of(node.name())); - return loza.prepareRaftGroup(groupId, configuration.peer(node.name()), configuration, raftGroupListenerSupplier) + var nodeId = new RaftNodeId(groupId, configuration.peer(node.name())); + + return loza.startRaftGroupNode(nodeId, configuration, raftGroupListener, RaftGroupEventsListener.noopLsnr) .get(10, TimeUnit.SECONDS); } diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java index 226c4045bf..10a4a25466 100644 --- a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java +++ b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java @@ -224,14 +224,14 @@ public class ItRaftGroupServiceTest extends IgniteAbstractTest { } CompletableFuture<RaftGroupService> startRaftGroup(PeersAndLearners configuration) { + String nodeName = clusterService.topologyService().localMember().name(); + + Peer serverPeer = configuration.peer(nodeName); + + var nodeId = new RaftNodeId(RAFT_GROUP_NAME, serverPeer == null ? configuration.learner(nodeName) : serverPeer); + try { - raftGroupService = loza.prepareRaftGroup( - RAFT_GROUP_NAME, - configuration.peer(clusterService.topologyService().localMember().name()), - configuration, - () -> mock(RaftGroupListener.class), - () -> eventsListener - ); + raftGroupService = loza.startRaftGroupNode(nodeId, configuration, mock(RaftGroupListener.class), eventsListener); } catch (NodeStoppingException e) { return CompletableFuture.failedFuture(e); } diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java index 0123e96155..968313d7c8 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java @@ -17,8 +17,6 @@ package org.apache.ignite.internal.raft; -import static org.apache.ignite.internal.raft.RaftGroupEventsListener.noopLsnr; - import java.nio.file.Path; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -26,7 +24,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Supplier; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.logger.IgniteLogger; @@ -166,62 +163,13 @@ public class Loza implements RaftManager { } @Override - public CompletableFuture<RaftGroupService> prepareRaftGroup( - ReplicationGroupId groupId, - @Nullable Peer serverPeer, - PeersAndLearners configuration, - Supplier<RaftGroupListener> lsnrSupplier - ) throws NodeStoppingException { - return prepareRaftGroup(groupId, serverPeer, configuration, lsnrSupplier, () -> noopLsnr, RaftGroupOptions.defaults()); - } - - @Override - public CompletableFuture<RaftGroupService> prepareRaftGroup( - ReplicationGroupId groupId, - @Nullable Peer serverPeer, - PeersAndLearners configuration, - Supplier<RaftGroupListener> lsnrSupplier, - Supplier<RaftGroupEventsListener> raftGrpEvtsLsnrSupplier - ) throws NodeStoppingException { - return prepareRaftGroup(groupId, serverPeer, configuration, lsnrSupplier, raftGrpEvtsLsnrSupplier, RaftGroupOptions.defaults()); - } - - /** - * Optionally starts a Raft node and creates a Raft group service providing operations on a Raft group. - * - * @param groupId Raft group ID. - * @param serverPeer Local peer that will host the Raft node. If {@code null} - no nodes will be started, but only the Raft client - * service. - * @param configuration Peers and Learners of the Raft group. - * @param lsnrSupplier Raft group listener supplier. - * @param raftGrpEvtsLsnrSupplier Raft group events listener supplier. - * @param groupOptions Options to apply to the group. - * @return Future representing pending completion of the operation. - * @throws NodeStoppingException If node stopping intention was detected. - */ - private CompletableFuture<RaftGroupService> prepareRaftGroup( - ReplicationGroupId groupId, - @Nullable Peer serverPeer, + public CompletableFuture<RaftGroupService> startRaftGroupNode( + RaftNodeId nodeId, PeersAndLearners configuration, - Supplier<RaftGroupListener> lsnrSupplier, - Supplier<RaftGroupEventsListener> raftGrpEvtsLsnrSupplier, - RaftGroupOptions groupOptions + RaftGroupListener lsnr, + RaftGroupEventsListener eventsLsnr ) throws NodeStoppingException { - if (!busyLock.enterBusy()) { - throw new NodeStoppingException(); - } - - try { - if (serverPeer != null) { - var nodeId = new RaftNodeId(groupId, serverPeer); - - startRaftGroupNodeInternal(nodeId, configuration, lsnrSupplier.get(), raftGrpEvtsLsnrSupplier.get(), groupOptions); - } - - return startRaftGroupServiceInternal(groupId, configuration); - } finally { - busyLock.leaveBusy(); - } + return startRaftGroupNode(nodeId, configuration, lsnr, eventsLsnr, RaftGroupOptions.defaults()); } /** @@ -234,7 +182,7 @@ public class Loza implements RaftManager { * @param groupOptions Options to apply to the group. * @throws NodeStoppingException If node stopping intention was detected. */ - public void startRaftGroupNode( + public CompletableFuture<RaftGroupService> startRaftGroupNode( RaftNodeId nodeId, PeersAndLearners configuration, RaftGroupListener lsnr, @@ -246,7 +194,7 @@ public class Loza implements RaftManager { } try { - startRaftGroupNodeInternal(nodeId, configuration, lsnr, eventsLsnr, groupOptions); + return startRaftGroupNodeInternal(nodeId, configuration, lsnr, eventsLsnr, groupOptions); } finally { busyLock.leaveBusy(); } @@ -268,7 +216,7 @@ public class Loza implements RaftManager { } } - private void startRaftGroupNodeInternal( + private CompletableFuture<RaftGroupService> startRaftGroupNodeInternal( RaftNodeId nodeId, PeersAndLearners configuration, RaftGroupListener lsnr, @@ -287,6 +235,8 @@ public class Loza implements RaftManager { nodeId )); } + + return startRaftGroupServiceInternal(nodeId.groupId(), configuration); } private CompletableFuture<RaftGroupService> startRaftGroupServiceInternal(ReplicationGroupId grpId, PeersAndLearners configuration) { diff --git a/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java b/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java index e12f4ab839..d32d000b2c 100644 --- a/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java +++ b/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java @@ -72,10 +72,13 @@ public class LozaTest extends IgniteAbstractTest { Peer serverPeer = configuration.peer("test1"); + assertThrows( + NodeStoppingException.class, + () -> loza.startRaftGroupNode(new RaftNodeId(raftGroupId, serverPeer), configuration, null, null) + ); assertThrows(NodeStoppingException.class, () -> loza.startRaftGroupService(raftGroupId, configuration)); assertThrows(NodeStoppingException.class, () -> loza.stopRaftNode(new RaftNodeId(raftGroupId, serverPeer))); assertThrows(NodeStoppingException.class, () -> loza.stopRaftNodes(raftGroupId)); - assertThrows(NodeStoppingException.class, () -> loza.prepareRaftGroup(raftGroupId, serverPeer, configuration, () -> null)); } /** diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java index f2e26cb317..feb8242f57 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java @@ -442,7 +442,7 @@ public class MockedStructuresTest extends IgniteAbstractTest { * @return Table manager. */ private TableManager mockManagers() throws NodeStoppingException { - when(rm.prepareRaftGroup(any(), any(), any(), any())).thenAnswer(mock -> { + when(rm.startRaftGroupNode(any(), any(), any(), any())).thenAnswer(mock -> { RaftGroupService raftGrpSrvcMock = mock(RaftGroupService.class); when(raftGrpSrvcMock.leader()).thenReturn(new Peer("test")); diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java index 9da886c7b7..9673817b74 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java @@ -53,6 +53,7 @@ import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.raft.Loza; import org.apache.ignite.internal.raft.Peer; import org.apache.ignite.internal.raft.PeersAndLearners; +import org.apache.ignite.internal.raft.RaftGroupEventsListener; import org.apache.ignite.internal.raft.RaftGroupServiceImpl; import org.apache.ignite.internal.raft.RaftNodeId; import org.apache.ignite.internal.raft.configuration.RaftConfiguration; @@ -416,17 +417,17 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest { PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(partAssignments); - CompletableFuture<Void> partitionReadyFuture = raftServers.get(assignment).prepareRaftGroup( - grpId, - configuration.peer(assignment), + CompletableFuture<Void> partitionReadyFuture = raftServers.get(assignment).startRaftGroupNode( + new RaftNodeId(grpId, configuration.peer(assignment)), configuration, - () -> new PartitionListener( + new PartitionListener( new TestPartitionDataStorage(testMpPartStorage), new TestTxStateStorage(), txManagers.get(assignment), () -> Map.of(pkStorage.get().id(), pkStorage.get()), partId - ) + ), + RaftGroupEventsListener.noopLsnr ).thenAccept( raftSvc -> { try {