This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 2d4880ef30 IGNITE-20826 Add maxObservableSafeTime initialization on 
leader election (#3207)
2d4880ef30 is described below

commit 2d4880ef30c5d10d4ec74b59cf7a6f78996361c9
Author: Alexander Lapin <lapin1...@gmail.com>
AuthorDate: Wed Feb 14 12:33:40 2024 +0200

    IGNITE-20826 Add maxObservableSafeTime initialization on leader election 
(#3207)
---
 .../apache/ignite/client/fakes/FakeTxManager.java  |   5 +
 .../internal/raft/service/RaftGroupListener.java   |   7 +
 .../internal/raft/server/impl/JraftServerImpl.java |   7 +
 .../snapshot/local/LocalSnapshotCopierTest.java    |  14 +-
 .../ItAbstractInternalTableScanTest.java           |   2 -
 .../ItInternalTableReadOnlyOperationsTest.java     |   4 -
 .../ReplicasSafeTimePropagationTest.java           | 304 +++++++++++++++++++++
 .../table/distributed/raft/PartitionListener.java  |  32 +--
 .../replicator/PartitionReplicaListener.java       |   1 -
 .../org/apache/ignite/internal/tx/TxManager.java   |   4 +
 .../ignite/internal/tx/impl/TxManagerImpl.java     |   5 +
 11 files changed, 353 insertions(+), 32 deletions(-)

diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index 72e1615f8e..61ecc9e848 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -230,4 +230,9 @@ public class FakeTxManager implements TxManager {
     public void removeInflight(UUID txId) {
         // No-op.
     }
+
+    @Override
+    public HybridClock clock() {
+        return clock;
+    }
 }
diff --git 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java
 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java
index 212a34807c..6ca04df0eb 100644
--- 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java
+++ 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java
@@ -81,4 +81,11 @@ public interface RaftGroupListener {
      * Invoked once after a raft node has been shut down.
      */
     void onShutdown();
+
+    /**
+     * Invoked when the belonging node becomes the leader of the group.
+     */
+    default void onLeaderStart() {
+        // No-op.
+    }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index 366f3b5331..8a00b2d27f 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -796,5 +796,12 @@ public class JraftServerImpl implements RaftServer {
         public void onShutdown() {
             listener.onShutdown();
         }
+
+        @Override
+        public void onLeaderStart(long term) {
+            super.onLeaderStart(term);
+
+            listener.onLeaderStart();
+        }
     }
 }
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotCopierTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotCopierTest.java
index 8ecfdc3777..84a8f4240e 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotCopierTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotCopierTest.java
@@ -16,6 +16,12 @@
  */
 package org.apache.ignite.raft.jraft.storage.snapshot.local;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ForkJoinPool;
@@ -42,7 +48,6 @@ import org.apache.ignite.raft.jraft.util.ByteString;
 import org.apache.ignite.raft.jraft.util.Utils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.ArgumentCaptor;
@@ -52,12 +57,6 @@ import org.mockito.junit.jupiter.MockitoExtension;
 import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertSame;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.eq;
-
 @ExtendWith(MockitoExtension.class)
 @MockitoSettings(strictness = Strictness.LENIENT)
 public class LocalSnapshotCopierTest extends BaseStorageTest {
@@ -144,7 +143,6 @@ public class LocalSnapshotCopierTest extends 
BaseStorageTest {
     }
 
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-16620";)
     public void testInterrupt() throws Exception {
         final CompletableFuture<Message> future = new CompletableFuture<>();
         final RpcRequests.GetFileRequest rb = 
raftOptions.getRaftMessagesFactory()
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
index 714e7a05f6..aa54bce1fc 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
@@ -99,8 +99,6 @@ public abstract class ItAbstractInternalTableScanTest extends 
IgniteAbstractTest
      */
     @BeforeEach
     public void setUp(TestInfo testInfo) {
-        
when(mockStorage.scan(any(HybridTimestamp.class))).thenReturn(mock(PartitionTimestampCursor.class));
-
         internalTbl = new DummyInternalTableImpl(
                 mock(ReplicaService.class), mockStorage, ROW_SCHEMA, 
txConfiguration, storageUpdateConfiguration);
     }
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
index b66c0336d3..4c280a06f1 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
@@ -44,7 +44,6 @@ import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExten
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowConverter;
@@ -56,7 +55,6 @@ import 
org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguratio
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
-import org.apache.ignite.internal.storage.PartitionTimestampCursor;
 import org.apache.ignite.internal.table.InternalTable;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowPkReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlySingleRowPkReplicaRequest;
@@ -122,8 +120,6 @@ public class ItInternalTableReadOnlyOperationsTest extends 
IgniteAbstractTest {
      */
     @BeforeEach
     public void setUp(TestInfo testInfo) {
-        
when(mockStorage.scan(any(HybridTimestamp.class))).thenReturn(mock(PartitionTimestampCursor.class));
-
         internalTbl = new DummyInternalTableImpl(replicaService, mockStorage, 
SCHEMA, txConfiguration, storageUpdateConfiguration);
 
         lenient().when(readOnlyTx.isReadOnly()).thenReturn(true);
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java
new file mode 100644
index 0000000000..d2f0a8f2e6
--- /dev/null
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.distributed;
+
+import static org.apache.ignite.internal.hlc.HybridTimestamp.CLOCK_SKEW;
+import static 
org.apache.ignite.internal.raft.PeersAndLearners.fromConsistentIds;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.catalog.CatalogService;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.lang.SafeTimeReorderException;
+import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.network.StaticNodeFinder;
+import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.RaftGroupEventsListener;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.raft.server.RaftGroupOptions;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.replicator.TestReplicationGroupId;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
+import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
+import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Replica safeTime propagation tests.
+ */
+@ExtendWith(ConfigurationExtension.class)
+public class ReplicasSafeTimePropagationTest extends IgniteAbstractTest {
+    @InjectConfiguration("mock: { fsync: false }")
+    private RaftConfiguration raftConfiguration;
+
+    private static final int BASE_PORT = 1234;
+
+    private static final TestReplicationGroupId GROUP_ID = new 
TestReplicationGroupId("group_1");
+
+    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new 
ReplicaMessagesFactory();
+
+    private static final StaticNodeFinder NODE_FINDER = new StaticNodeFinder(
+            IntStream.range(BASE_PORT, BASE_PORT + 5)
+                    .mapToObj(p -> new NetworkAddress("localhost", p))
+                    .collect(Collectors.toList())
+    );
+
+    private AtomicInteger port = new AtomicInteger(BASE_PORT);
+
+    private Map<String, PartialNode> cluster;
+
+    @AfterEach
+    public void after() throws Exception {
+        for (PartialNode partialNode : cluster.values()) {
+            try {
+                partialNode.stop();
+            } catch (NodeStoppingException ignored) {
+                // No-op, multiple stop.
+            }
+        }
+    }
+
+    /**
+     * Test verifies that a new leader will reject a command with safeTime 
less than previously applied within old leader.
+     * <ol>
+     *     <li>Start three nodes and a raft group with three peers.</li>
+     *     <li>Send command with safe time X.</li>
+     *     <li>Stop the leader - the only node that actually do safeTime 
watermark validation within onBeforeApply.</li>
+     *     <li>Send command with safe time less than X to the new leader and 
verify that SafeTimeReorderException is thrown.</li>
+     * </ol>
+     */
+    @Test
+    public void testSafeTimeReorderingOnLeaderReElection() throws Exception {
+        // Start three nodes and a raft group with three peers.
+        {
+            cluster = Set.of("node1", "node2", 
"node3").stream().collect(Collectors.toMap(Function.identity(), 
PartialNode::new));
+
+            startCluster(cluster);
+        }
+
+        PartialNode someNode = cluster.values().iterator().next();
+
+        RaftGroupService raftClient = someNode.raftClient;
+
+        assertThat(raftClient.refreshLeader(), willCompleteSuccessfully());
+
+        long firstSafeTime = calculateSafeTime(someNode.clock);
+
+        // Send command with safe time X.
+        sendSafeTimeSyncCommand(raftClient, firstSafeTime, false);
+
+        // Stop the leader - the only node that actually do safeTime watermark 
validation within onBeforeApply.
+        assertNotNull(raftClient.leader());
+
+        PartialNode nodeTopStop = 
cluster.get(raftClient.leader().consistentId());
+
+        assertNotNull(nodeTopStop);
+
+        nodeTopStop.stop();
+
+        // Select alive raft client.
+        Optional<PartialNode> aliveNode = 
cluster.values().stream().filter(node -> 
!node.nodeName.equals(nodeTopStop.nodeName)).findFirst();
+
+        assertTrue(aliveNode.isPresent());
+
+        RaftGroupService anotherClient = aliveNode.get().raftClient;
+
+        // Send command with safe time less than previously applied to the new 
leader and verify that SafeTimeReorderException is thrown.
+        sendSafeTimeSyncCommand(anotherClient, firstSafeTime - 1, true);
+
+        sendSafeTimeSyncCommand(anotherClient, 
calculateSafeTime(aliveNode.get().clock), false);
+    }
+
+    /**
+     * Test verifies that a leader will reject a command with safeTime less 
than previously applied within leader restart.
+     * <ol>
+     *     <li>Start two and a raft group with two peer.</li>
+     *     <li>Send command with safe time X.</li>
+     *     <li>Restart the cluster.</li>
+     *     <li>Send command with safe time less than previously applied to the 
leader before the restart
+     *     and verify that SafeTimeReorderException is thrown.</li>
+     * </ol>
+     */
+    @Test
+    public void testSafeTimeReorderingOnLeaderRestart() throws Exception {
+        // Start two node and a raft group with two peer.
+        {
+            cluster = Set.of("node1", 
"node2").parallelStream().collect(Collectors.toMap(Function.identity(), 
PartialNode::new));
+
+            startCluster(cluster);
+        }
+
+        PartialNode someNode = cluster.values().iterator().next();
+
+        RaftGroupService raftClient = someNode.raftClient;
+
+        assertThat(raftClient.refreshLeader(), willCompleteSuccessfully());
+
+        long firstSafeTime = calculateSafeTime(someNode.clock);
+
+        // Send command with safe time X.
+        sendSafeTimeSyncCommand(raftClient, firstSafeTime, false);
+
+        // Stop all nodes.
+        for (PartialNode node : cluster.values()
+        ) {
+            node.stop();
+        }
+
+        // And restart.
+        startCluster(cluster);
+
+        // Send command with safe time less than previously applied to the 
leader before the restart
+        // and verify that SafeTimeReorderException is thrown.
+        sendSafeTimeSyncCommand(someNode.raftClient, firstSafeTime - 1, true);
+
+        sendSafeTimeSyncCommand(someNode.raftClient, 
calculateSafeTime(someNode.clock), false);
+    }
+
+    private void startCluster(Map<String, PartialNode> cluster) throws 
Exception {
+        Collection<CompletableFuture<Void>> startingFutures = new 
ArrayList<>(cluster.size());
+        for (PartialNode node : cluster.values()) {
+            startingFutures.add(node.start());
+        }
+
+        CompletableFuture<Void> clusterReadyFuture = 
CompletableFuture.allOf(startingFutures.toArray(CompletableFuture[]::new));
+
+        assertThat(clusterReadyFuture, willCompleteSuccessfully());
+    }
+
+    private static void sendSafeTimeSyncCommand(
+            RaftGroupService raftClient,
+            long safeTime,
+            boolean expectSafeTimeReorderException
+    ) {
+        CompletableFuture<Object> safeTimeCommandFuture = raftClient.run(
+                REPLICA_MESSAGES_FACTORY
+                        .safeTimeSyncCommand()
+                        .safeTimeLong(safeTime)
+                        .build()
+        );
+
+        if (expectSafeTimeReorderException) {
+            assertThat(safeTimeCommandFuture, 
willThrow(SafeTimeReorderException.class));
+        } else {
+            assertThat(safeTimeCommandFuture, willCompleteSuccessfully());
+        }
+    }
+
+    private static long calculateSafeTime(HybridClock clock) {
+        return clock.now().addPhysicalTime(CLOCK_SKEW).longValue();
+    }
+
+    private class PartialNode {
+        private final String nodeName;
+
+        private final HybridClock clock;
+
+        private ClusterService clusterService;
+
+        private Loza raftManager;
+
+        private RaftGroupService raftClient;
+
+        PartialNode(String nodeName) {
+            this.nodeName = nodeName;
+            this.clock = new HybridClockImpl();
+        }
+
+        CompletableFuture<Void> start() throws Exception {
+            clusterService = ClusterServiceTestUtils.clusterService(nodeName, 
port.getAndIncrement(), NODE_FINDER);
+
+            clusterService.start();
+
+            raftManager = new Loza(
+                    clusterService,
+                    raftConfiguration,
+                    workDir.resolve(nodeName + "_loza"),
+                    new HybridClockImpl(),
+                    new RaftGroupEventsClientListener()
+            );
+
+            raftManager.start();
+
+            TxManager txManagerMock = mock(TxManager.class);
+            when(txManagerMock.clock()).thenReturn(clock);
+
+            return raftManager.startRaftGroupNode(
+                            new RaftNodeId(GROUP_ID, new Peer(nodeName)),
+                            fromConsistentIds(cluster.keySet()),
+                            new PartitionListener(
+                                    txManagerMock,
+                                    mock(PartitionDataStorage.class),
+                                    mock(StorageUpdateHandler.class),
+                                    mock(TxStateStorage.class),
+                                    mock(PendingComparableValuesTracker.class),
+                                    mock(PendingComparableValuesTracker.class),
+                                    mock(CatalogService.class)
+                            ),
+                            RaftGroupEventsListener.noopLsnr,
+                            RaftGroupOptions.defaults()
+                    )
+                    .thenApply(raftClient -> {
+                        this.raftClient = raftClient;
+                        return null;
+                    });
+        }
+
+        void stop() throws Exception {
+            closeAll(
+                    raftManager == null ? null : () -> 
raftManager.stopRaftNodes(GROUP_ID),
+                    raftManager == null ? null : raftManager::beforeNodeStop,
+                    clusterService == null ? null : 
clusterService::beforeNodeStop,
+                    raftManager == null ? null : raftManager::stop,
+                    clusterService == null ? null : clusterService::stop
+            );
+        }
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 2a3a4b4b77..e418646b3a 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.table.distributed.raft;
 import static java.util.Objects.requireNonNull;
 import static java.util.function.Predicate.not;
 import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.hlc.HybridTimestamp.CLOCK_SKEW;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static 
org.apache.ignite.internal.table.distributed.TableUtils.indexIdsAtRwTxBeginTs;
 import static org.apache.ignite.internal.tx.TxState.ABORTED;
@@ -57,7 +58,6 @@ import 
org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
 import org.apache.ignite.internal.storage.BinaryRowAndRowId;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.MvPartitionStorage.Locker;
-import org.apache.ignite.internal.storage.PartitionTimestampCursor;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
@@ -103,11 +103,9 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
     /** Storage index tracker. */
     private final PendingComparableValuesTracker<Long, Void> 
storageIndexTracker;
 
-    // TODO: https://issues.apache.org/jira/browse/IGNITE-20826 Restore on 
restart
     /** Is used in order to detect and retry safe time reordering within 
onBeforeApply. */
-    private long maxObservableSafeTime = -1;
+    private volatile long maxObservableSafeTime = -1;
 
-    // TODO: https://issues.apache.org/jira/browse/IGNITE-20826 Restore on 
restart
     /** Is used in order to assert safe time reordering within onWrite. */
     private long maxObservableSafeTimeVerifier = -1;
 
@@ -138,17 +136,6 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
         this.safeTime = safeTime;
         this.storageIndexTracker = storageIndexTracker;
         this.catalogService = catalogService;
-
-        // TODO: IGNITE-18502 Excessive full partition scan on node start
-        try (PartitionTimestampCursor cursor = 
partitionDataStorage.scan(HybridTimestamp.MAX_VALUE)) {
-            while (cursor.hasNext()) {
-                ReadResult readResult = cursor.next();
-
-                if (readResult.isWriteIntent()) {
-                    
storageUpdateHandler.handleWriteIntentRead(readResult.transactionId(), 
readResult.rowId());
-                }
-            }
-        }
     }
 
     @Override
@@ -169,7 +156,10 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
                 SafeTimePropagatingCommand cmd = (SafeTimePropagatingCommand) 
command;
                 long proposedSafeTime = cmd.safeTime().longValue();
 
-                assert proposedSafeTime > maxObservableSafeTimeVerifier : 
"Safe time reordering detected [current="
+                // Because of clock.tick it's guaranteed that two different 
commands will have different safe timestamps.
+                // maxObservableSafeTime may match proposedSafeTime only if it 
is the command that was previously validated and then retried
+                // by raft client because of either TimeoutException or inner 
raft server recoverable exception.
+                assert proposedSafeTime >= maxObservableSafeTimeVerifier : 
"Safe time reordering detected [current="
                         + maxObservableSafeTimeVerifier + ", proposed=" + 
proposedSafeTime + "]";
 
                 maxObservableSafeTimeVerifier = proposedSafeTime;
@@ -482,6 +472,11 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
         storage.close();
     }
 
+    @Override
+    public void onLeaderStart() {
+        maxObservableSafeTime = 
txManager.clock().now().addPhysicalTime(CLOCK_SKEW).longValue();
+    }
+
     @Override
     public boolean onBeforeApply(Command command) {
         // This method is synchronized by replication group specific monitor, 
see ActionRequestProcessor#handleRequest.
@@ -489,7 +484,10 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
             SafeTimePropagatingCommand cmd = (SafeTimePropagatingCommand) 
command;
             long proposedSafeTime = cmd.safeTime().longValue();
 
-            if (proposedSafeTime > maxObservableSafeTime) {
+            // Because of clock.tick it's guaranteed that two different 
commands will have different safe timestamps.
+            // maxObservableSafeTime may match proposedSafeTime only if it is 
the command that was previously validated and then retried
+            // by raft client because of either TimeoutException or inner raft 
server recoverable exception.
+            if (proposedSafeTime >= maxObservableSafeTime) {
                 maxObservableSafeTime = proposedSafeTime;
             } else {
                 throw new SafeTimeReorderException();
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 5a5007737d..8c77e17dbb 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -1461,7 +1461,6 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param request Transaction finish request.
      * @return future result of the operation.
      */
-    // TODO: need to properly handle primary replica changes 
https://issues.apache.org/jira/browse/IGNITE-17615
     private CompletableFuture<TransactionResult> 
processTxFinishAction(TxFinishReplicaRequest request) {
         // TODO: https://issues.apache.org/jira/browse/IGNITE-19170 Use 
ZonePartitionIdMessage and remove cast
         Map<TablePartitionId, String> enlistedGroups = (Map<TablePartitionId, 
String>) (Map<?, ?>) request.groups();
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index 39c10641c7..aa663d6aa5 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -22,6 +22,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import java.util.function.Supplier;
+import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.lang.IgniteInternalException;
@@ -213,4 +214,7 @@ public interface TxManager extends IgniteComponent {
      * @param txId The transction id
      */
     void removeInflight(UUID txId);
+
+    /** Returns the node's hybrid clock. */
+    HybridClock clock();
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 875b26423b..9c88cd8363 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -806,6 +806,11 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
         tuple.onRemovedInflights();
     }
 
+    @Override
+    public HybridClock clock() {
+        return clock;
+    }
+
     @Override
     public void onReceived(NetworkMessage message, String senderConsistentId, 
@Nullable Long correlationId) {
         if (!(message instanceof ReplicaResponse) || correlationId != null) {

Reply via email to