This is an automated email from the ASF dual-hosted git repository. sanpwc pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 2d4880ef30 IGNITE-20826 Add maxObservableSafeTime initialization on leader election (#3207) 2d4880ef30 is described below commit 2d4880ef30c5d10d4ec74b59cf7a6f78996361c9 Author: Alexander Lapin <lapin1...@gmail.com> AuthorDate: Wed Feb 14 12:33:40 2024 +0200 IGNITE-20826 Add maxObservableSafeTime initialization on leader election (#3207) --- .../apache/ignite/client/fakes/FakeTxManager.java | 5 + .../internal/raft/service/RaftGroupListener.java | 7 + .../internal/raft/server/impl/JraftServerImpl.java | 7 + .../snapshot/local/LocalSnapshotCopierTest.java | 14 +- .../ItAbstractInternalTableScanTest.java | 2 - .../ItInternalTableReadOnlyOperationsTest.java | 4 - .../ReplicasSafeTimePropagationTest.java | 304 +++++++++++++++++++++ .../table/distributed/raft/PartitionListener.java | 32 +-- .../replicator/PartitionReplicaListener.java | 1 - .../org/apache/ignite/internal/tx/TxManager.java | 4 + .../ignite/internal/tx/impl/TxManagerImpl.java | 5 + 11 files changed, 353 insertions(+), 32 deletions(-) diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java index 72e1615f8e..61ecc9e848 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java +++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java @@ -230,4 +230,9 @@ public class FakeTxManager implements TxManager { public void removeInflight(UUID txId) { // No-op. } + + @Override + public HybridClock clock() { + return clock; + } } diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java index 212a34807c..6ca04df0eb 100644 --- a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java +++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java @@ -81,4 +81,11 @@ public interface RaftGroupListener { * Invoked once after a raft node has been shut down. */ void onShutdown(); + + /** + * Invoked when the belonging node becomes the leader of the group. + */ + default void onLeaderStart() { + // No-op. + } } diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java index 366f3b5331..8a00b2d27f 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java @@ -796,5 +796,12 @@ public class JraftServerImpl implements RaftServer { public void onShutdown() { listener.onShutdown(); } + + @Override + public void onLeaderStart(long term) { + super.onLeaderStart(term); + + listener.onLeaderStart(); + } } } diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotCopierTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotCopierTest.java index 8ecfdc3777..84a8f4240e 100644 --- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotCopierTest.java +++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotCopierTest.java @@ -16,6 +16,12 @@ */ package org.apache.ignite.raft.jraft.storage.snapshot.local; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.eq; + import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ForkJoinPool; @@ -42,7 +48,6 @@ import org.apache.ignite.raft.jraft.util.ByteString; import org.apache.ignite.raft.jraft.util.Utils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; @@ -52,12 +57,6 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertSame; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.eq; - @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) public class LocalSnapshotCopierTest extends BaseStorageTest { @@ -144,7 +143,6 @@ public class LocalSnapshotCopierTest extends BaseStorageTest { } @Test - @Disabled("https://issues.apache.org/jira/browse/IGNITE-16620") public void testInterrupt() throws Exception { final CompletableFuture<Message> future = new CompletableFuture<>(); final RpcRequests.GetFileRequest rb = raftOptions.getRaftMessagesFactory() diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java index 714e7a05f6..aa54bce1fc 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java @@ -99,8 +99,6 @@ public abstract class ItAbstractInternalTableScanTest extends IgniteAbstractTest */ @BeforeEach public void setUp(TestInfo testInfo) { - when(mockStorage.scan(any(HybridTimestamp.class))).thenReturn(mock(PartitionTimestampCursor.class)); - internalTbl = new DummyInternalTableImpl( mock(ReplicaService.class), mockStorage, ROW_SCHEMA, txConfiguration, storageUpdateConfiguration); } diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java index b66c0336d3..4c280a06f1 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java @@ -44,7 +44,6 @@ import org.apache.ignite.internal.configuration.testframework.ConfigurationExten import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; -import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.BinaryRowConverter; @@ -56,7 +55,6 @@ import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguratio import org.apache.ignite.internal.schema.row.Row; import org.apache.ignite.internal.schema.row.RowAssembler; import org.apache.ignite.internal.storage.MvPartitionStorage; -import org.apache.ignite.internal.storage.PartitionTimestampCursor; import org.apache.ignite.internal.table.InternalTable; import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowPkReplicaRequest; import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlySingleRowPkReplicaRequest; @@ -122,8 +120,6 @@ public class ItInternalTableReadOnlyOperationsTest extends IgniteAbstractTest { */ @BeforeEach public void setUp(TestInfo testInfo) { - when(mockStorage.scan(any(HybridTimestamp.class))).thenReturn(mock(PartitionTimestampCursor.class)); - internalTbl = new DummyInternalTableImpl(replicaService, mockStorage, SCHEMA, txConfiguration, storageUpdateConfiguration); lenient().when(readOnlyTx.isReadOnly()).thenReturn(true); diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java new file mode 100644 index 0000000000..d2f0a8f2e6 --- /dev/null +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.distributed; + +import static org.apache.ignite.internal.hlc.HybridTimestamp.CLOCK_SKEW; +import static org.apache.ignite.internal.raft.PeersAndLearners.fromConsistentIds; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.apache.ignite.internal.util.IgniteUtils.closeAll; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.ignite.internal.catalog.CatalogService; +import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; +import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.lang.NodeStoppingException; +import org.apache.ignite.internal.lang.SafeTimeReorderException; +import org.apache.ignite.internal.network.ClusterService; +import org.apache.ignite.internal.network.StaticNodeFinder; +import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils; +import org.apache.ignite.internal.raft.Loza; +import org.apache.ignite.internal.raft.Peer; +import org.apache.ignite.internal.raft.RaftGroupEventsListener; +import org.apache.ignite.internal.raft.RaftNodeId; +import org.apache.ignite.internal.raft.configuration.RaftConfiguration; +import org.apache.ignite.internal.raft.server.RaftGroupOptions; +import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.replicator.TestReplicationGroupId; +import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory; +import org.apache.ignite.internal.table.distributed.StorageUpdateHandler; +import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage; +import org.apache.ignite.internal.table.distributed.raft.PartitionListener; +import org.apache.ignite.internal.testframework.IgniteAbstractTest; +import org.apache.ignite.internal.tx.TxManager; +import org.apache.ignite.internal.tx.storage.state.TxStateStorage; +import org.apache.ignite.internal.util.PendingComparableValuesTracker; +import org.apache.ignite.network.NetworkAddress; +import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +/** + * Replica safeTime propagation tests. + */ +@ExtendWith(ConfigurationExtension.class) +public class ReplicasSafeTimePropagationTest extends IgniteAbstractTest { + @InjectConfiguration("mock: { fsync: false }") + private RaftConfiguration raftConfiguration; + + private static final int BASE_PORT = 1234; + + private static final TestReplicationGroupId GROUP_ID = new TestReplicationGroupId("group_1"); + + private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory(); + + private static final StaticNodeFinder NODE_FINDER = new StaticNodeFinder( + IntStream.range(BASE_PORT, BASE_PORT + 5) + .mapToObj(p -> new NetworkAddress("localhost", p)) + .collect(Collectors.toList()) + ); + + private AtomicInteger port = new AtomicInteger(BASE_PORT); + + private Map<String, PartialNode> cluster; + + @AfterEach + public void after() throws Exception { + for (PartialNode partialNode : cluster.values()) { + try { + partialNode.stop(); + } catch (NodeStoppingException ignored) { + // No-op, multiple stop. + } + } + } + + /** + * Test verifies that a new leader will reject a command with safeTime less than previously applied within old leader. + * <ol> + * <li>Start three nodes and a raft group with three peers.</li> + * <li>Send command with safe time X.</li> + * <li>Stop the leader - the only node that actually do safeTime watermark validation within onBeforeApply.</li> + * <li>Send command with safe time less than X to the new leader and verify that SafeTimeReorderException is thrown.</li> + * </ol> + */ + @Test + public void testSafeTimeReorderingOnLeaderReElection() throws Exception { + // Start three nodes and a raft group with three peers. + { + cluster = Set.of("node1", "node2", "node3").stream().collect(Collectors.toMap(Function.identity(), PartialNode::new)); + + startCluster(cluster); + } + + PartialNode someNode = cluster.values().iterator().next(); + + RaftGroupService raftClient = someNode.raftClient; + + assertThat(raftClient.refreshLeader(), willCompleteSuccessfully()); + + long firstSafeTime = calculateSafeTime(someNode.clock); + + // Send command with safe time X. + sendSafeTimeSyncCommand(raftClient, firstSafeTime, false); + + // Stop the leader - the only node that actually do safeTime watermark validation within onBeforeApply. + assertNotNull(raftClient.leader()); + + PartialNode nodeTopStop = cluster.get(raftClient.leader().consistentId()); + + assertNotNull(nodeTopStop); + + nodeTopStop.stop(); + + // Select alive raft client. + Optional<PartialNode> aliveNode = cluster.values().stream().filter(node -> !node.nodeName.equals(nodeTopStop.nodeName)).findFirst(); + + assertTrue(aliveNode.isPresent()); + + RaftGroupService anotherClient = aliveNode.get().raftClient; + + // Send command with safe time less than previously applied to the new leader and verify that SafeTimeReorderException is thrown. + sendSafeTimeSyncCommand(anotherClient, firstSafeTime - 1, true); + + sendSafeTimeSyncCommand(anotherClient, calculateSafeTime(aliveNode.get().clock), false); + } + + /** + * Test verifies that a leader will reject a command with safeTime less than previously applied within leader restart. + * <ol> + * <li>Start two and a raft group with two peer.</li> + * <li>Send command with safe time X.</li> + * <li>Restart the cluster.</li> + * <li>Send command with safe time less than previously applied to the leader before the restart + * and verify that SafeTimeReorderException is thrown.</li> + * </ol> + */ + @Test + public void testSafeTimeReorderingOnLeaderRestart() throws Exception { + // Start two node and a raft group with two peer. + { + cluster = Set.of("node1", "node2").parallelStream().collect(Collectors.toMap(Function.identity(), PartialNode::new)); + + startCluster(cluster); + } + + PartialNode someNode = cluster.values().iterator().next(); + + RaftGroupService raftClient = someNode.raftClient; + + assertThat(raftClient.refreshLeader(), willCompleteSuccessfully()); + + long firstSafeTime = calculateSafeTime(someNode.clock); + + // Send command with safe time X. + sendSafeTimeSyncCommand(raftClient, firstSafeTime, false); + + // Stop all nodes. + for (PartialNode node : cluster.values() + ) { + node.stop(); + } + + // And restart. + startCluster(cluster); + + // Send command with safe time less than previously applied to the leader before the restart + // and verify that SafeTimeReorderException is thrown. + sendSafeTimeSyncCommand(someNode.raftClient, firstSafeTime - 1, true); + + sendSafeTimeSyncCommand(someNode.raftClient, calculateSafeTime(someNode.clock), false); + } + + private void startCluster(Map<String, PartialNode> cluster) throws Exception { + Collection<CompletableFuture<Void>> startingFutures = new ArrayList<>(cluster.size()); + for (PartialNode node : cluster.values()) { + startingFutures.add(node.start()); + } + + CompletableFuture<Void> clusterReadyFuture = CompletableFuture.allOf(startingFutures.toArray(CompletableFuture[]::new)); + + assertThat(clusterReadyFuture, willCompleteSuccessfully()); + } + + private static void sendSafeTimeSyncCommand( + RaftGroupService raftClient, + long safeTime, + boolean expectSafeTimeReorderException + ) { + CompletableFuture<Object> safeTimeCommandFuture = raftClient.run( + REPLICA_MESSAGES_FACTORY + .safeTimeSyncCommand() + .safeTimeLong(safeTime) + .build() + ); + + if (expectSafeTimeReorderException) { + assertThat(safeTimeCommandFuture, willThrow(SafeTimeReorderException.class)); + } else { + assertThat(safeTimeCommandFuture, willCompleteSuccessfully()); + } + } + + private static long calculateSafeTime(HybridClock clock) { + return clock.now().addPhysicalTime(CLOCK_SKEW).longValue(); + } + + private class PartialNode { + private final String nodeName; + + private final HybridClock clock; + + private ClusterService clusterService; + + private Loza raftManager; + + private RaftGroupService raftClient; + + PartialNode(String nodeName) { + this.nodeName = nodeName; + this.clock = new HybridClockImpl(); + } + + CompletableFuture<Void> start() throws Exception { + clusterService = ClusterServiceTestUtils.clusterService(nodeName, port.getAndIncrement(), NODE_FINDER); + + clusterService.start(); + + raftManager = new Loza( + clusterService, + raftConfiguration, + workDir.resolve(nodeName + "_loza"), + new HybridClockImpl(), + new RaftGroupEventsClientListener() + ); + + raftManager.start(); + + TxManager txManagerMock = mock(TxManager.class); + when(txManagerMock.clock()).thenReturn(clock); + + return raftManager.startRaftGroupNode( + new RaftNodeId(GROUP_ID, new Peer(nodeName)), + fromConsistentIds(cluster.keySet()), + new PartitionListener( + txManagerMock, + mock(PartitionDataStorage.class), + mock(StorageUpdateHandler.class), + mock(TxStateStorage.class), + mock(PendingComparableValuesTracker.class), + mock(PendingComparableValuesTracker.class), + mock(CatalogService.class) + ), + RaftGroupEventsListener.noopLsnr, + RaftGroupOptions.defaults() + ) + .thenApply(raftClient -> { + this.raftClient = raftClient; + return null; + }); + } + + void stop() throws Exception { + closeAll( + raftManager == null ? null : () -> raftManager.stopRaftNodes(GROUP_ID), + raftManager == null ? null : raftManager::beforeNodeStop, + clusterService == null ? null : clusterService::beforeNodeStop, + raftManager == null ? null : raftManager::stop, + clusterService == null ? null : clusterService::stop + ); + } + } +} diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java index 2a3a4b4b77..e418646b3a 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.table.distributed.raft; import static java.util.Objects.requireNonNull; import static java.util.function.Predicate.not; import static java.util.stream.Collectors.toList; +import static org.apache.ignite.internal.hlc.HybridTimestamp.CLOCK_SKEW; import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; import static org.apache.ignite.internal.table.distributed.TableUtils.indexIdsAtRwTxBeginTs; import static org.apache.ignite.internal.tx.TxState.ABORTED; @@ -57,7 +58,6 @@ import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand; import org.apache.ignite.internal.storage.BinaryRowAndRowId; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.storage.MvPartitionStorage.Locker; -import org.apache.ignite.internal.storage.PartitionTimestampCursor; import org.apache.ignite.internal.storage.ReadResult; import org.apache.ignite.internal.storage.RowId; import org.apache.ignite.internal.table.distributed.StorageUpdateHandler; @@ -103,11 +103,9 @@ public class PartitionListener implements RaftGroupListener, BeforeApplyHandler /** Storage index tracker. */ private final PendingComparableValuesTracker<Long, Void> storageIndexTracker; - // TODO: https://issues.apache.org/jira/browse/IGNITE-20826 Restore on restart /** Is used in order to detect and retry safe time reordering within onBeforeApply. */ - private long maxObservableSafeTime = -1; + private volatile long maxObservableSafeTime = -1; - // TODO: https://issues.apache.org/jira/browse/IGNITE-20826 Restore on restart /** Is used in order to assert safe time reordering within onWrite. */ private long maxObservableSafeTimeVerifier = -1; @@ -138,17 +136,6 @@ public class PartitionListener implements RaftGroupListener, BeforeApplyHandler this.safeTime = safeTime; this.storageIndexTracker = storageIndexTracker; this.catalogService = catalogService; - - // TODO: IGNITE-18502 Excessive full partition scan on node start - try (PartitionTimestampCursor cursor = partitionDataStorage.scan(HybridTimestamp.MAX_VALUE)) { - while (cursor.hasNext()) { - ReadResult readResult = cursor.next(); - - if (readResult.isWriteIntent()) { - storageUpdateHandler.handleWriteIntentRead(readResult.transactionId(), readResult.rowId()); - } - } - } } @Override @@ -169,7 +156,10 @@ public class PartitionListener implements RaftGroupListener, BeforeApplyHandler SafeTimePropagatingCommand cmd = (SafeTimePropagatingCommand) command; long proposedSafeTime = cmd.safeTime().longValue(); - assert proposedSafeTime > maxObservableSafeTimeVerifier : "Safe time reordering detected [current=" + // Because of clock.tick it's guaranteed that two different commands will have different safe timestamps. + // maxObservableSafeTime may match proposedSafeTime only if it is the command that was previously validated and then retried + // by raft client because of either TimeoutException or inner raft server recoverable exception. + assert proposedSafeTime >= maxObservableSafeTimeVerifier : "Safe time reordering detected [current=" + maxObservableSafeTimeVerifier + ", proposed=" + proposedSafeTime + "]"; maxObservableSafeTimeVerifier = proposedSafeTime; @@ -482,6 +472,11 @@ public class PartitionListener implements RaftGroupListener, BeforeApplyHandler storage.close(); } + @Override + public void onLeaderStart() { + maxObservableSafeTime = txManager.clock().now().addPhysicalTime(CLOCK_SKEW).longValue(); + } + @Override public boolean onBeforeApply(Command command) { // This method is synchronized by replication group specific monitor, see ActionRequestProcessor#handleRequest. @@ -489,7 +484,10 @@ public class PartitionListener implements RaftGroupListener, BeforeApplyHandler SafeTimePropagatingCommand cmd = (SafeTimePropagatingCommand) command; long proposedSafeTime = cmd.safeTime().longValue(); - if (proposedSafeTime > maxObservableSafeTime) { + // Because of clock.tick it's guaranteed that two different commands will have different safe timestamps. + // maxObservableSafeTime may match proposedSafeTime only if it is the command that was previously validated and then retried + // by raft client because of either TimeoutException or inner raft server recoverable exception. + if (proposedSafeTime >= maxObservableSafeTime) { maxObservableSafeTime = proposedSafeTime; } else { throw new SafeTimeReorderException(); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index 5a5007737d..8c77e17dbb 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -1461,7 +1461,6 @@ public class PartitionReplicaListener implements ReplicaListener { * @param request Transaction finish request. * @return future result of the operation. */ - // TODO: need to properly handle primary replica changes https://issues.apache.org/jira/browse/IGNITE-17615 private CompletableFuture<TransactionResult> processTxFinishAction(TxFinishReplicaRequest request) { // TODO: https://issues.apache.org/jira/browse/IGNITE-19170 Use ZonePartitionIdMessage and remove cast Map<TablePartitionId, String> enlistedGroups = (Map<TablePartitionId, String>) (Map<?, ?>) request.groups(); diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java index 39c10641c7..aa663d6aa5 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java @@ -22,6 +22,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.function.Function; import java.util.function.Supplier; +import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteBiTuple; import org.apache.ignite.internal.lang.IgniteInternalException; @@ -213,4 +214,7 @@ public interface TxManager extends IgniteComponent { * @param txId The transction id */ void removeInflight(UUID txId); + + /** Returns the node's hybrid clock. */ + HybridClock clock(); } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java index 875b26423b..9c88cd8363 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java @@ -806,6 +806,11 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { tuple.onRemovedInflights(); } + @Override + public HybridClock clock() { + return clock; + } + @Override public void onReceived(NetworkMessage message, String senderConsistentId, @Nullable Long correlationId) { if (!(message instanceof ReplicaResponse) || correlationId != null) {