This is an automated email from the ASF dual-hosted git repository. sanpwc pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 5897877995 IGNITE-20425 Fix corrupted Raft FSM state after restart (#2839) 5897877995 is described below commit 5897877995f8ccda008bcf06252accbf8c9a8905 Author: Alexander Lapin <lapin1...@gmail.com> AuthorDate: Wed Nov 29 15:06:17 2023 +0200 IGNITE-20425 Fix corrupted Raft FSM state after restart (#2839) --- .../raft/server/ItJraftCounterServerTest.java | 4 +- .../java/org/apache/ignite/internal/raft/Loza.java | 17 - .../ignite/internal/raft/server/RaftServer.java | 9 - .../internal/raft/server/impl/JraftServerImpl.java | 10 - .../apache/ignite/raft/jraft/RaftGroupService.java | 8 - .../apache/ignite/raft/jraft/core/NodeImpl.java | 41 -- .../ItPlacementDriverReplicaSideTest.java | 3 +- .../ignite/distributed/ItTablePersistenceTest.java | 565 --------------------- .../internal/table/distributed/TableManager.java | 5 +- .../table/distributed/TableManagerTest.java | 1 - 10 files changed, 3 insertions(+), 660 deletions(-) diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java index 9835fc8f6f..eb57322d9c 100644 --- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java +++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java @@ -663,13 +663,11 @@ class ItJraftCounterServerTest extends JraftAbstractTest { .snapshotStorageFactory(new SnapshotInMemoryStorageFactory(snapshotMetaStorage)); raftServer.startRaftNode(new RaftNodeId(grpId, serverPeer), initialMembersConf, listener, opts); - - raftServer.raftNodeReadyFuture(grpId).join(); }, opts -> {}); } for (AtomicInteger counter : counters.values()) { - assertEquals(3, counter.get()); + assertTrue(waitForCondition(() -> counter.get() == 3, 10_000)); } } diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java index b6a80931c0..50a073b805 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java @@ -268,9 +268,6 @@ public class Loza implements RaftManager { ) throws NodeStoppingException { CompletableFuture<RaftGroupService> fut = startRaftGroupNode(nodeId, configuration, lsnr, eventsLsnr, RaftGroupOptions.defaults()); - // TODO: https://issues.apache.org/jira/browse/IGNITE-19047 Meta storage and cmg raft log re-application in async manner - raftServer.raftNodeReadyFuture(nodeId.groupId()).join(); - return fut; } @@ -310,9 +307,6 @@ public class Loza implements RaftManager { factory ); - // TODO: https://issues.apache.org/jira/browse/IGNITE-19047 Meta storage and cmg raft log re-application in async manner - raftServer.raftNodeReadyFuture(nodeId.groupId()).join(); - return startRaftServiceFuture; } finally { busyLock.leaveBusy(); @@ -414,17 +408,6 @@ public class Loza implements RaftManager { return raftServer.isStarted(nodeId); } - /** - * Gets a future that completes when all committed updates are applied to state machine after the RAFT node start. - * TODO: IGNITE-18273 The method should be defined in RaftManager and takes RaftNodeId instead of its argument. - * - * @param groupId Raft group id. - * @return Future to last applied revision. - */ - public CompletableFuture<Long> raftNodeReadyFuture(ReplicationGroupId groupId) { - return raftServer.raftNodeReadyFuture(groupId); - } - @Override public boolean stopRaftNode(RaftNodeId nodeId) throws NodeStoppingException { if (!busyLock.enterBusy()) { diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java index d016dd9069..910d384e5f 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.raft.server; import java.util.List; import java.util.Set; -import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.manager.IgniteComponent; import org.apache.ignite.internal.raft.Peer; import org.apache.ignite.internal.raft.PeersAndLearners; @@ -85,14 +84,6 @@ public interface RaftServer extends IgniteComponent { */ boolean isStarted(RaftNodeId nodeId); - /** - * Returns a future, which complete when the raft node is ready and committed updates are applied. - * - * @param groupId Raft group ID. - * @return A future to last applied revision on start. - */ - CompletableFuture<Long> raftNodeReadyFuture(ReplicationGroupId groupId); - /** * Stops a given local Raft node if it exists. * diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java index ca9393947d..9e83cbaf96 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java @@ -30,10 +30,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Map.Entry; import java.util.Queue; import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; @@ -523,14 +521,6 @@ public class JraftServerImpl implements RaftServer { return nodes.containsKey(nodeId); } - @Override - public CompletableFuture<Long> raftNodeReadyFuture(ReplicationGroupId groupId) { - RaftGroupService jraftNode = nodes.entrySet().stream().filter(entry -> entry.getKey().groupId().equals(groupId)) - .map(Entry::getValue).findAny().get(); - - return jraftNode.getApplyCommittedFuture(); - } - @Override public boolean stopRaftNode(RaftNodeId nodeId) { RaftGroupService svc = nodes.remove(nodeId); diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftGroupService.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftGroupService.java index c81a741e90..3f2b13f79d 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftGroupService.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftGroupService.java @@ -155,14 +155,6 @@ public class RaftGroupService { return this.node; } - /** - * Gets a future which complete when all committed update are applied to the node's state machine on start. - * @return Future completes when this node committed revision would be equal to the applied one. - */ - public CompletableFuture<Long> getApplyCommittedFuture() { - return node.getApplyCommittedFuture(); - } - public synchronized void shutdown() { // TODO asch remove handlers before shutting down raft node https://issues.apache.org/jira/browse/IGNITE-14519 if (!this.started) { diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java index 7b939bc346..259fa29269 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java @@ -152,9 +152,6 @@ public class NodeImpl implements Node, RaftServerService { .writeLock(); protected final Lock readLock = this.readWriteLock .readLock(); - - /** The future completes when all committed actions applied to RAFT state machine. */ - private final CompletableFuture<Long> applyCommittedFuture; private volatile State state; private volatile CountDownLatch shutdownLatch; private long currTerm; @@ -577,7 +574,6 @@ public class NodeImpl implements Node, RaftServerService { updateLastLeaderTimestamp(Utils.monotonicMs()); this.confCtx = new ConfigurationCtx(this); this.wakingCandidate = null; - this.applyCommittedFuture = new CompletableFuture<>(); this.ownFsmCallerExecutorDisruptorConfig = ownFsmCallerExecutorDisruptorConfig; } @@ -1084,29 +1080,6 @@ public class NodeImpl implements Node, RaftServerService { // Adds metric registry to RPC service. this.options.setMetricRegistry(this.metrics.getMetricRegistry()); - // Wait committed. - long commitIdx = logManager.getLastLogIndex(); - - CompletableFuture<Long> logApplyComplition = new CompletableFuture<>(); - - if (commitIdx > fsmCaller.getLastAppliedIndex()) { - LastAppliedLogIndexListener lnsr = new LastAppliedLogIndexListener() { - @Override - public void onApplied( long lastAppliedLogIndex) { - if (lastAppliedLogIndex >= commitIdx) { - logApplyComplition.complete(lastAppliedLogIndex); - fsmCaller.removeLastAppliedLogIndexListener(this); - } - } - }; - - fsmCaller.addLastAppliedLogIndexListener(lnsr); - - fsmCaller.onCommitted(commitIdx); - } else { - logApplyComplition.complete(fsmCaller.getLastAppliedIndex()); - } - if (!this.rpcClientService.init(this.options)) { LOG.error("Fail to init rpc service."); return false; @@ -1125,10 +1098,6 @@ public class NodeImpl implements Node, RaftServerService { return false; } - logApplyComplition.whenComplete((committedIdx, err) -> { - if (err != null) { - LOG.error("Fail to apply committed updates.", err); - } // set state to follower this.state = State.STATE_FOLLOWER; @@ -1159,19 +1128,9 @@ public class NodeImpl implements Node, RaftServerService { this.writeLock.unlock(); } - applyCommittedFuture.complete(commitIdx); - }); - return true; } - /** - * Gets a future which complete when all committed update are applied to the node's state machine on start. - * @return Future completes when this node committed revision would be equal to the applied one. - */ - public CompletableFuture<Long> getApplyCommittedFuture() { - return applyCommittedFuture; - } /** * Validates a required option if shared pools are enabled. * diff --git a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java index 29087c5199..1197b80612 100644 --- a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java +++ b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.replicator; -import static java.util.concurrent.CompletableFuture.allOf; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.stream.Collectors.toSet; import static org.apache.ignite.internal.raft.PeersAndLearners.fromConsistentIds; @@ -450,7 +449,7 @@ public class ItPlacementDriverReplicaSideTest extends IgniteAbstractTest { replicaManager.startReplica( groupId, - allOf(raftManager.raftNodeReadyFuture(groupId)), + completedFuture(null), (request, senderId) -> { log.info("Handle request [type={}]", request.getClass().getSimpleName()); diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java deleted file mode 100644 index 35c2815e11..0000000000 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java +++ /dev/null @@ -1,565 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.distributed; - -import static java.util.concurrent.CompletableFuture.completedFuture; -import static org.apache.ignite.internal.replicator.ReplicaManager.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS; -import static org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfigurationSchema.DEFAULT_DATA_REGION_NAME; -import static org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener.tablePartitionId; -import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; -import static org.apache.ignite.internal.util.ArrayUtils.asList; -import static org.apache.ignite.internal.util.IgniteUtils.closeAll; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.mockito.Answers.RETURNS_DEEP_STUBS; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import it.unimi.dsi.fastutil.ints.Int2ObjectMaps; -import java.nio.ByteBuffer; -import java.nio.file.Path; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.BooleanSupplier; -import java.util.function.Function; -import org.apache.ignite.internal.catalog.CatalogService; -import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; -import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; -import org.apache.ignite.internal.hlc.HybridClock; -import org.apache.ignite.internal.hlc.HybridClockImpl; -import org.apache.ignite.internal.hlc.HybridTimestamp; -import org.apache.ignite.internal.placementdriver.TestPlacementDriver; -import org.apache.ignite.internal.raft.Marshaller; -import org.apache.ignite.internal.raft.server.RaftServer; -import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; -import org.apache.ignite.internal.raft.service.ItAbstractListenerSnapshotTest; -import org.apache.ignite.internal.raft.service.RaftGroupListener; -import org.apache.ignite.internal.raft.service.RaftGroupService; -import org.apache.ignite.internal.replicator.ReplicaService; -import org.apache.ignite.internal.replicator.TablePartitionId; -import org.apache.ignite.internal.replicator.TestReplicationGroupId; -import org.apache.ignite.internal.replicator.message.ReplicaRequest; -import org.apache.ignite.internal.schema.BinaryRow; -import org.apache.ignite.internal.schema.BinaryRowConverter; -import org.apache.ignite.internal.schema.Column; -import org.apache.ignite.internal.schema.ColumnsExtractor; -import org.apache.ignite.internal.schema.SchemaDescriptor; -import org.apache.ignite.internal.schema.row.Row; -import org.apache.ignite.internal.schema.row.RowAssembler; -import org.apache.ignite.internal.storage.MvPartitionStorage; -import org.apache.ignite.internal.storage.ReadResult; -import org.apache.ignite.internal.storage.RowId; -import org.apache.ignite.internal.storage.engine.MvTableStorage; -import org.apache.ignite.internal.storage.engine.StorageTableDescriptor; -import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier; -import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine; -import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration; -import org.apache.ignite.internal.table.InternalTable; -import org.apache.ignite.internal.table.distributed.StorageUpdateHandler; -import org.apache.ignite.internal.table.distributed.TableMessagesFactory; -import org.apache.ignite.internal.table.distributed.command.FinishTxCommand; -import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand; -import org.apache.ignite.internal.table.distributed.command.UpdateCommand; -import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler; -import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage; -import org.apache.ignite.internal.table.distributed.raft.PartitionListener; -import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyDirectSingleRowReplicaRequest; -import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowPkReplicaRequest; -import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest; -import org.apache.ignite.internal.table.distributed.replication.request.SingleRowPkReplicaRequest; -import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener; -import org.apache.ignite.internal.table.distributed.replicator.action.RequestType; -import org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller; -import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; -import org.apache.ignite.internal.table.impl.DummyInternalTableImpl; -import org.apache.ignite.internal.testframework.WorkDirectoryExtension; -import org.apache.ignite.internal.tx.HybridTimestampTracker; -import org.apache.ignite.internal.tx.TxManager; -import org.apache.ignite.internal.tx.impl.HeapLockManager; -import org.apache.ignite.internal.tx.impl.TransactionIdGenerator; -import org.apache.ignite.internal.tx.impl.TxManagerImpl; -import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest; -import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage; -import org.apache.ignite.internal.tx.storage.state.test.TestTxStateTableStorage; -import org.apache.ignite.internal.type.NativeTypes; -import org.apache.ignite.internal.util.PendingComparableValuesTracker; -import org.apache.ignite.network.ClusterNode; -import org.apache.ignite.network.ClusterNodeImpl; -import org.apache.ignite.network.ClusterService; -import org.apache.ignite.network.NetworkAddress; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.TestInfo; -import org.junit.jupiter.api.extension.ExtendWith; - -/** - * Persistent partitions raft group snapshots tests. - */ -@ExtendWith({WorkDirectoryExtension.class, ConfigurationExtension.class}) -public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<PartitionListener> { - private static final String NODE_NAME = "node1"; - - private static final String NODE_ID = "node1"; - - private static final TestPlacementDriver TEST_PLACEMENT_DRIVER = new TestPlacementDriver(NODE_NAME, NODE_ID); - - /** Factory to create RAFT command messages. */ - private final TableMessagesFactory msgFactory = new TableMessagesFactory(); - - @InjectConfiguration("mock {flushDelayMillis = 0, defaultRegion {size = 16777216, writeBufferSize = 16777216}}") - private RocksDbStorageEngineConfiguration engineConfig; - - private static final SchemaDescriptor SCHEMA = new SchemaDescriptor( - 1, - new Column[]{new Column("key", NativeTypes.INT64, false)}, - new Column[]{new Column("value", NativeTypes.INT64, false)} - ); - - private static final ColumnsExtractor PK_EXTRACTOR = BinaryRowConverter.keyExtractor(SCHEMA); - - private static final Row FIRST_VALUE_PK = createKeyRow(1); - - private static final Row FIRST_VALUE = createKeyValueRow(1, 1); - - private static final Row SECOND_VALUE_PK = createKeyRow(2); - - private static final Row SECOND_VALUE = createKeyValueRow(2, 2); - - /** Paths for created partition listeners. */ - private final Map<PartitionListener, Path> paths = new ConcurrentHashMap<>(); - - /** Map of node indexes to partition listeners. */ - private final Map<Integer, PartitionListener> partListeners = new ConcurrentHashMap<>(); - - /** Map of node indexes to table storages. */ - private final Map<Integer, MvTableStorage> mvTableStorages = new ConcurrentHashMap<>(); - - /** Map of node indexes to partition storages. */ - private final Map<Integer, MvPartitionStorage> mvPartitionStorages = new ConcurrentHashMap<>(); - - /** Map of node indexes to transaction managers. */ - private final Map<Integer, TxManager> txManagers = new ConcurrentHashMap<>(); - - private final ReplicaService replicaService = mock(ReplicaService.class, RETURNS_DEEP_STUBS); - - private final Function<String, ClusterNode> consistentIdToNode = addr - -> new ClusterNodeImpl(NODE_ID, NODE_NAME, new NetworkAddress(addr, 3333)); - - private final HybridClock hybridClock = new HybridClockImpl(); - - private int stoppedNodeIndex; - - private InternalTable table; - - private final LinkedList<AutoCloseable> closeables = new LinkedList<>(); - - @BeforeEach - @Override - public void beforeTest(TestInfo testInfo) { - super.beforeTest(testInfo); - - closeables.clear(); - } - - @AfterEach - @Override - public void afterTest() throws Exception { - super.afterTest(); - - closeAll(closeables); - } - - @Override - public void beforeFollowerStop(RaftGroupService service, RaftServer server) throws Exception { - PartitionReplicaListener partitionReplicaListener = mockPartitionReplicaListener(service); - - when(replicaService.invoke(any(ClusterNode.class), any())) - .thenAnswer(invocationOnMock -> { - ClusterNode node = invocationOnMock.getArgument(0); - - return partitionReplicaListener.invoke(invocationOnMock.getArgument(1), node.id()); - }); - - for (int i = 0; i < nodes(); i++) { - if (!txManagers.containsKey(i)) { - TxManager txManager = new TxManagerImpl( - service.clusterService(), - replicaService, - new HeapLockManager(), - hybridClock, - new TransactionIdGenerator(i), - TEST_PLACEMENT_DRIVER, - () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS - ); - - txManager.start(); - - txManagers.put(i, txManager); - closeables.add(txManager::stop); - } - } - - TxManager txManager = new TxManagerImpl( - service.clusterService(), - replicaService, - new HeapLockManager(), - hybridClock, - new TransactionIdGenerator(-1), - TEST_PLACEMENT_DRIVER, - () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS - ); - - txManager.start(); - - closeables.add(txManager::stop); - - table = new InternalTableImpl( - "table", - 1, - Int2ObjectMaps.singleton(0, service), - 1, - consistentIdToNode, - txManager, - mock(MvTableStorage.class), - new TestTxStateTableStorage(), - replicaService, - hybridClock, - new HybridTimestampTracker(), - TEST_PLACEMENT_DRIVER - ); - - closeables.add(() -> table.close()); - - table.upsert(FIRST_VALUE, null).get(); - } - - private PartitionReplicaListener mockPartitionReplicaListener(RaftGroupService service) { - PartitionReplicaListener partitionReplicaListener = mock(PartitionReplicaListener.class); - - when(partitionReplicaListener.invoke(any(), any())).thenAnswer(invocationOnMock -> { - ReplicaRequest req = invocationOnMock.getArgument(0); - - if (req instanceof ReadWriteSingleRowPkReplicaRequest || req instanceof ReadOnlyDirectSingleRowReplicaRequest) { - SingleRowPkReplicaRequest req0 = (SingleRowPkReplicaRequest) req; - - if (req0.requestType() == RequestType.RW_GET || req0.requestType() == RequestType.RO_GET) { - List<JraftServerImpl> servers = servers(); - - JraftServerImpl leader = servers.stream() - .filter(server -> server.localPeers(raftGroupId()).contains(service.leader())) - .findFirst().orElseThrow(); - - // We only read from the leader, every other node may not have the latest data. - int storageIndex = servers.indexOf(leader); - - // Here we must account for the stopped node, index in "servers" and index in "mvPartitionStorages" will differ - // for "serverIndex >= stoppedNodeIndex". - if (storageIndex >= stoppedNodeIndex) { - storageIndex++; - } - - MvPartitionStorage partitionStorage = mvPartitionStorages.get(storageIndex); - - Map<ByteBuffer, RowId> primaryIndex = pkIndex(partitionStorage); - RowId rowId = primaryIndex.get(req0.primaryKey()); - - if (rowId == null) { - return completedFuture(null); - } - - BinaryRow row = partitionStorage.read(rowId, HybridTimestamp.MAX_VALUE).binaryRow(); - - return completedFuture(row); - } else if (req0.requestType() == RequestType.RW_DELETE) { - ReadWriteSingleRowPkReplicaRequest rwReq = (ReadWriteSingleRowPkReplicaRequest) req0; - - UpdateCommand cmd = msgFactory.updateCommand() - .txId(rwReq.transactionId()) - .tablePartitionId(tablePartitionId(new TablePartitionId(1, 0))) - .rowUuid(new RowId(0).uuid()) - .safeTimeLong(hybridClock.nowLong()) - .txCoordinatorId(UUID.randomUUID().toString()) - .build(); - - return service.run(cmd); - } - } else if (req instanceof ReadWriteSingleRowReplicaRequest) { - ReadWriteSingleRowReplicaRequest req0 = (ReadWriteSingleRowReplicaRequest) req; - - UpdateCommand cmd = msgFactory.updateCommand() - .txId(req0.transactionId()) - .tablePartitionId(tablePartitionId(new TablePartitionId(1, 0))) - .rowUuid(new RowId(0).uuid()) - .messageRowToUpdate(msgFactory.timedBinaryRowMessage() - .binaryRowMessage(msgFactory.binaryRowMessage() - .schemaVersion(req0.schemaVersion()) - .binaryTuple(req0.binaryTuple()) - .build()) - .build()) - .safeTimeLong(hybridClock.nowLong()) - .txCoordinatorId(UUID.randomUUID().toString()) - .build(); - - return service.run(cmd); - } else if (req instanceof TxFinishReplicaRequest) { - TxFinishReplicaRequest req0 = (TxFinishReplicaRequest) req; - - FinishTxCommand cmd = msgFactory.finishTxCommand() - .txId(req0.txId()) - .commit(req0.commit()) - .commitTimestampLong(req0.commitTimestampLong()) - .tablePartitionIds(asList(tablePartitionId(new TablePartitionId(1, 0)))) - .safeTimeLong(hybridClock.nowLong()) - .txCoordinatorId(UUID.randomUUID().toString()) - .build(); - - return service.run(cmd) - .thenCompose(ignored -> { - TxCleanupCommand cleanupCmd = msgFactory.txCleanupCommand() - .txId(req0.txId()) - .commit(req0.commit()) - .commitTimestampLong(req0.commitTimestampLong()) - .safeTimeLong(hybridClock.nowLong()) - .txCoordinatorId(UUID.randomUUID().toString()) - .build(); - - return service.run(cleanupCmd); - }); - } - - throw new AssertionError("Unexpected request: " + req); - }); - - return partitionReplicaListener; - } - - @Override - public void afterFollowerStop(RaftGroupService service, RaftServer server, int stoppedNodeIndex) throws Exception { - // Remove the first key - table.delete(FIRST_VALUE_PK, null).get(); - - // Put deleted data again - table.upsert(FIRST_VALUE, null).get(); - - this.stoppedNodeIndex = stoppedNodeIndex; - - mvTableStorages.get(stoppedNodeIndex).stop(); - - paths.remove(partListeners.get(stoppedNodeIndex)); - } - - @Override - public void afterSnapshot(RaftGroupService service) throws Exception { - table.upsert(SECOND_VALUE, null).get(); - - assertNotNull(table.get(SECOND_VALUE_PK, null).join()); - } - - @Override - public BooleanSupplier snapshotCheckClosure(JraftServerImpl restarted, boolean interactedAfterSnapshot) { - MvPartitionStorage storage = getListener(restarted, raftGroupId()).getMvStorage(); - - return () -> { - Map<ByteBuffer, RowId> primaryIndex = pkIndex(storage); - - Row pk = interactedAfterSnapshot ? SECOND_VALUE_PK : FIRST_VALUE_PK; - - RowId rowId = primaryIndex.get(pk.byteBuffer()); - - if (rowId == null) { - return false; - } - - ReadResult read = storage.read(rowId, HybridTimestamp.MAX_VALUE); - - if (read == null) { - return false; - } - - Row value = interactedAfterSnapshot ? SECOND_VALUE : FIRST_VALUE; - - return value.tupleSlice().equals(read.binaryRow().tupleSlice()); - }; - } - - private static Map<ByteBuffer, RowId> pkIndex(MvPartitionStorage storage) { - Map<ByteBuffer, RowId> result = new HashMap<>(); - - RowId rowId = storage.closestRowId(RowId.lowestRowId(0)); - - while (rowId != null) { - BinaryRow binaryRow = storage.read(rowId, HybridTimestamp.MAX_VALUE).binaryRow(); - - if (binaryRow != null) { - result.put(PK_EXTRACTOR.extractColumns(binaryRow).byteBuffer(), rowId); - } - - RowId incremented = rowId.increment(); - if (incremented == null) { - break; - } - - rowId = storage.closestRowId(incremented); - } - - return result; - } - - @Override - public Path getListenerPersistencePath(PartitionListener listener, RaftServer server) { - return paths.get(listener); - } - - @Override - public RaftGroupListener createListener(ClusterService service, Path path, int index) { - return paths.entrySet().stream() - .filter(entry -> entry.getValue().equals(path)) - .map(Map.Entry::getKey) - .findAny() - .orElseGet(() -> { - RocksDbStorageEngine storageEngine = new RocksDbStorageEngine("test", engineConfig, path); - storageEngine.start(); - - int tableId = 1; - - MvTableStorage mvTableStorage = storageEngine.createMvTable( - new StorageTableDescriptor(tableId, 1, DEFAULT_DATA_REGION_NAME), - new StorageIndexDescriptorSupplier(mock(CatalogService.class)) - ); - mvTableStorage.start(); - - mvTableStorages.put(index, mvTableStorage); - - int partitionId = 0; - - MvPartitionStorage mvPartitionStorage = getOrCreateMvPartition(mvTableStorage, partitionId); - mvPartitionStorages.put(index, mvPartitionStorage); - - PartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(tableId, partitionId, mvPartitionStorage); - - PendingComparableValuesTracker<HybridTimestamp, Void> safeTime = new PendingComparableValuesTracker<>( - new HybridTimestamp(1, 0) - ); - - IndexUpdateHandler indexUpdateHandler = new IndexUpdateHandler( - DummyInternalTableImpl.createTableIndexStoragesSupplier(Map.of()) - ); - - StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler( - partitionId, - partitionDataStorage, - indexUpdateHandler - ); - - TxManager txManager = txManagers.computeIfAbsent(index, k -> { - TxManager txMgr = new TxManagerImpl( - service, - replicaService, - new HeapLockManager(), - hybridClock, - new TransactionIdGenerator(index), - TEST_PLACEMENT_DRIVER, - () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS - ); - txMgr.start(); - closeables.add(txMgr::stop); - - return txMgr; - }); - - PartitionListener listener = new PartitionListener( - txManager, - partitionDataStorage, - storageUpdateHandler, - new TestTxStateStorage(), - safeTime, - new PendingComparableValuesTracker<>(0L) - ) { - @Override - public void onShutdown() { - super.onShutdown(); - - try { - closeAll(mvPartitionStorage::close, mvTableStorage::stop, storageEngine::stop); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; - - paths.put(listener, path); - partListeners.put(index, listener); - - return listener; - }); - } - - @Override - public TestReplicationGroupId raftGroupId() { - return new TestReplicationGroupId("partitions"); - } - - @Override - protected Marshaller commandsMarshaller(ClusterService clusterService) { - return new ThreadLocalPartitionCommandsMarshaller(clusterService.serializationRegistry()); - } - - /** - * Creates a {@link Row} with the supplied key and value. - * - * @param id Key. - * @param value Value. - * @return Row. - */ - private static Row createKeyValueRow(long id, long value) { - RowAssembler rowBuilder = new RowAssembler(SCHEMA); - - rowBuilder.appendLong(id); - rowBuilder.appendLong(value); - - return Row.wrapBinaryRow(SCHEMA, rowBuilder.build()); - } - - private static Row createKeyRow(long id) { - RowAssembler rowBuilder = RowAssembler.keyAssembler(SCHEMA); - - rowBuilder.appendLong(id); - - return Row.wrapKeyOnlyBinaryRow(SCHEMA, rowBuilder.build()); - } - - private static MvPartitionStorage getOrCreateMvPartition(MvTableStorage tableStorage, int partitionId) { - MvPartitionStorage mvPartition = tableStorage.getMvPartition(partitionId); - - if (mvPartition != null) { - return mvPartition; - } - - CompletableFuture<MvPartitionStorage> createMvPartitionFuture = tableStorage.createMvPartition(0); - - assertThat(createMvPartitionFuture, willCompleteSuccessfully()); - - return createMvPartitionFuture.join(); - } -} diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index bfb9630e71..1f9cb970b6 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -847,10 +847,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { raftGroupService ); - CompletableFuture<Void> whenReplicaReady = allOf( - ((Loza) raftMgr).raftNodeReadyFuture(replicaGrpId), - table.pkIndexesReadyFuture() - ); + CompletableFuture<Void> whenReplicaReady = table.pkIndexesReadyFuture(); replicaMgr.startReplica( replicaGrpId, diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java index f6046b0163..9d89362af9 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java @@ -530,7 +530,6 @@ public class TableManagerTest extends IgniteAbstractTest { private void testStoragesGetClearedInMiddleOfFailedRebalance(boolean isTxStorageUnderRebalance) throws NodeStoppingException { when(rm.startRaftGroupService(any(), any(), any(), any())) .thenAnswer(mock -> completedFuture(mock(TopologyAwareRaftGroupService.class))); - when(rm.raftNodeReadyFuture(any())).thenReturn(completedFuture(1L)); createZone(1, 1);