This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new b594483b22 IGNITE-22214 Meta storage idempotent invokes: implement 
idempotent cache cleanup logic (#3830)
b594483b22 is described below

commit b594483b22d473ed55af7552757c32e54d445175
Author: Alexander Lapin <lapin1...@gmail.com>
AuthorDate: Thu Jun 6 17:35:26 2024 +0300

    IGNITE-22214 Meta storage idempotent invokes: implement idempotent cache 
cleanup logic (#3830)
---
 .../DistributionZoneRebalanceEngineTest.java       | 16 ++++-
 .../RebalanceUtilUpdateAssignmentsTest.java        | 17 +++++-
 .../impl/ItIdempotentCommandCacheTest.java         | 64 ++++++++++++++++++--
 .../impl/ItMetaStorageManagerImplTest.java         | 13 +++-
 .../ItMetaStorageMultipleNodesAbstractTest.java    |  6 +-
 .../impl/ItMetaStorageServicePersistenceTest.java  |  9 ++-
 .../metastorage/impl/ItMetaStorageServiceTest.java | 13 +++-
 .../metastorage/impl/ItMetaStorageWatchTest.java   |  6 +-
 .../server/raft/ItMetaStorageRaftGroupTest.java    | 41 +++++++++++--
 .../metastorage/impl/MetaStorageManagerImpl.java   | 69 ++++++++++++++++++++--
 .../server/raft/MetaStorageListener.java           | 29 ++++++++-
 .../server/raft/MetaStorageWriteHandler.java       | 61 ++++++++++++++++++-
 .../impl/IdempotentCommandCacheTest.java           | 28 ++++++---
 .../MetaStorageDeployWatchesCorrectnessTest.java   |  9 ++-
 .../impl/MetaStorageManagerRecoveryTest.java       |  9 ++-
 .../impl/StandaloneMetaStorageManager.java         | 19 +++++-
 .../MultiActorPlacementDriverTest.java             |  9 ++-
 .../PlacementDriverManagerTest.java                |  9 ++-
 .../service/ItAbstractListenerSnapshotTest.java    |  2 +-
 .../ItDistributedConfigurationPropertiesTest.java  |  6 +-
 .../ItDistributedConfigurationStorageTest.java     |  6 +-
 .../runner/app/ItIgniteNodeRestartTest.java        |  9 ++-
 .../org/apache/ignite/internal/app/IgniteImpl.java |  8 ++-
 .../rebalance/ItRebalanceDistributedTest.java      |  8 ++-
 24 files changed, 410 insertions(+), 56 deletions(-)

diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
index 445fa9cfd9..cc301bce76 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
@@ -30,6 +30,7 @@ import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.REBALANCE_SCHEDULER_POOL_SIZE;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
+import static 
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
 import static org.apache.ignite.internal.table.TableTestUtils.getTableIdStrict;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -72,6 +73,8 @@ import org.apache.ignite.internal.affinity.Assignments;
 import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.commands.ColumnParams;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
 import org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil;
 import org.apache.ignite.internal.distributionzones.Node;
@@ -100,6 +103,7 @@ import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.raft.service.CommandClosure;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.TablePartitionId;
@@ -112,10 +116,12 @@ import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 /**
  * Tests the distribution zone dataNodes watch listener in {@link 
DistributionZoneRebalanceEngine}.
  */
+@ExtendWith(ConfigurationExtension.class)
 public class DistributionZoneRebalanceEngineTest extends IgniteAbstractTest {
     private static final String ZONE_NAME_0 = "zone0";
 
@@ -141,6 +147,9 @@ public class DistributionZoneRebalanceEngineTest extends 
IgniteAbstractTest {
 
     private ScheduledExecutorService rebalanceScheduler;
 
+    @InjectConfiguration
+    private RaftConfiguration raftConfiguration;
+
     @BeforeEach
     public void setUp() {
         String nodeName = "test";
@@ -180,7 +189,12 @@ public class DistributionZoneRebalanceEngineTest extends 
IgniteAbstractTest {
 
         keyValueStorage = spy(new SimpleInMemoryKeyValueStorage(nodeName));
 
-        MetaStorageListener metaStorageListener = new 
MetaStorageListener(keyValueStorage, mock(ClusterTimeImpl.class));
+        MetaStorageListener metaStorageListener = new MetaStorageListener(
+                keyValueStorage,
+                mock(ClusterTimeImpl.class),
+                raftConfiguration.retryTimeout(),
+                completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
+        );
 
         RaftGroupService metaStorageService = mock(RaftGroupService.class);
 
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
index 8e98c825c2..af146edfcc 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
@@ -17,10 +17,12 @@
 
 package org.apache.ignite.internal.distributionzones.rebalance;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toSet;
 import static 
org.apache.ignite.internal.affinity.AffinityUtils.calculateAssignmentForPartition;
 import static org.apache.ignite.internal.affinity.Assignments.toBytes;
 import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static 
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -42,6 +44,8 @@ import org.apache.ignite.internal.affinity.Assignment;
 import org.apache.ignite.internal.affinity.Assignments;
 import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -59,6 +63,7 @@ import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.raft.service.CommandClosure;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.TablePartitionId;
@@ -76,7 +81,7 @@ import org.mockito.quality.Strictness;
 /**
  * Tests for updating assignment in the meta storage.
  */
-@ExtendWith(MockitoExtension.class)
+@ExtendWith({MockitoExtension.class, ConfigurationExtension.class})
 @MockitoSettings(strictness = Strictness.LENIENT)
 public class RebalanceUtilUpdateAssignmentsTest extends IgniteAbstractTest {
     private static final IgniteLogger LOG = 
Loggers.forClass(RebalanceUtilUpdateAssignmentsTest.class);
@@ -99,6 +104,9 @@ public class RebalanceUtilUpdateAssignmentsTest extends 
IgniteAbstractTest {
             DEFAULT_STORAGE_PROFILE
     );
 
+    @InjectConfiguration
+    private RaftConfiguration raftConfiguration;
+
     private static final int partNum = 2;
     private static final int replicas = 2;
 
@@ -122,7 +130,12 @@ public class RebalanceUtilUpdateAssignmentsTest extends 
IgniteAbstractTest {
 
         keyValueStorage = spy(new SimpleInMemoryKeyValueStorage("test"));
 
-        MetaStorageListener metaStorageListener = new 
MetaStorageListener(keyValueStorage, mock(ClusterTimeImpl.class));
+        MetaStorageListener metaStorageListener = new MetaStorageListener(
+                keyValueStorage,
+                mock(ClusterTimeImpl.class),
+                raftConfiguration.retryTimeout(),
+                completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
+        );
 
         RaftGroupService metaStorageService = mock(RaftGroupService.class);
 
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
index 66c0610155..453a80eca4 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
@@ -17,7 +17,9 @@
 
 package org.apache.ignite.internal.metastorage.impl;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toSet;
+import static 
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
@@ -30,6 +32,7 @@ import static 
org.apache.ignite.internal.util.IgniteUtils.startAsync;
 import static org.apache.ignite.internal.util.IgniteUtils.stopAsync;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -51,8 +54,12 @@ import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopolog
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.failure.NoOpFailureProcessor;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.ClockServiceImpl;
+import org.apache.ignite.internal.hlc.ClockWaiter;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.manager.ComponentContext;
@@ -114,8 +121,9 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
     private static final int YIELD_RESULT = 10;
     private static final int ANOTHER_YIELD_RESULT = 20;
 
-    @InjectConfiguration("mock.responseTimeout = 100")
+    @InjectConfiguration("mock.retryTimeout = 10000")
     private RaftConfiguration raftConfiguration;
+
     @InjectConfiguration("mock.idleSyncTimeInterval = 100")
     private MetaStorageConfiguration metaStorageConfiguration;
 
@@ -132,6 +140,10 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
 
         ClusterManagementGroupManager cmgManager;
 
+        ClockWaiter clockWaiter;
+
+        ClockService clockService;
+
         Node(
                 TestInfo testInfo,
                 RaftConfiguration raftConfiguration,
@@ -184,7 +196,17 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
                     clock,
                     topologyAwareRaftGroupServiceFactory,
                     new NoOpMetricManager(),
-                    metaStorageConfiguration
+                    metaStorageConfiguration,
+                    raftConfiguration.retryTimeout(),
+                    completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
+            );
+
+            clockWaiter = new ClockWaiter(clusterService.nodeName(), clock);
+
+            clockService = new ClockServiceImpl(
+                    clock,
+                    clockWaiter,
+                    () -> TEST_MAX_CLOCK_SKEW_MILLIS
             );
         }
 
@@ -193,7 +215,10 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
                 
when(cmgManager.metaStorageNodes()).thenReturn(metaStorageNodesFut);
             }
 
-            assertThat(startAsync(new ComponentContext(), clusterService, 
raftManager, metaStorageManager), willCompleteSuccessfully());
+            assertThat(
+                    startAsync(new ComponentContext(), clusterService, 
raftManager, metaStorageManager, clockWaiter),
+                    willCompleteSuccessfully()
+            );
         }
 
         void deployWatches() {
@@ -201,7 +226,7 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
         }
 
         void stop() throws Exception {
-            List<IgniteComponent> components = List.of(metaStorageManager, 
raftManager, clusterService);
+            List<IgniteComponent> components = List.of(clockWaiter, 
metaStorageManager, raftManager, clusterService);
 
             closeAll(Stream.concat(
                     components.stream().map(c -> c::beforeNodeStop),
@@ -364,6 +389,8 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
         // Restart cluster.
         startCluster(testInfo);
 
+        long timestampAfterRestartPhysicalLong = 
nodes.get(0).clockService.now().getPhysical();
+
         leader = leader(raftClient());
 
         // Run same idempotent command one more time and check that condition 
wasn't re-evaluated, but was retrieved from the cache instead.
@@ -374,9 +401,36 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
             assertTrue((Boolean) commandProcessingResult2);
             assertTrue(leader.checkValueInStorage(TEST_KEY.bytes(), 
TEST_VALUE));
         } else {
-            assertEquals(YIELD_RESULT, ((StatementResult) 
commandProcessingResult).getAsInt());
+            assertEquals(YIELD_RESULT, ((StatementResult) 
commandProcessingResult2).getAsInt());
             assertTrue(leader.checkValueInStorage(TEST_KEY_2.bytes(), 
TEST_VALUE_2));
         }
+
+        for (Node node : nodes) {
+            assertThat(node.clockService.waitFor(
+                    new HybridTimestamp(
+                            timestampAfterRestartPhysicalLong + 
raftConfiguration.retryTimeout().value()
+                                    + node.clockService.maxClockSkewMillis(),
+                            0
+                    )
+            ), willCompleteSuccessfully());
+        }
+
+        for (Node node : nodes) {
+            node.metaStorageManager.evictIdempotentCommandsCache();
+        }
+
+
+        // Run same idempotent command one more time and check that condition 
**was** re-evaluated and not retrieved from the cache.
+        CompletableFuture<Object> commandProcessingResultFuture3 = 
raftClient().run(idempotentCommand);
+        assertThat(commandProcessingResultFuture3, willCompleteSuccessfully());
+        Object commandProcessingResult3 = commandProcessingResultFuture3.get();
+        if (idempotentCommand instanceof InvokeCommand) {
+            assertFalse((Boolean) commandProcessingResult3);
+            assertTrue(leader.checkValueInStorage(TEST_KEY.bytes(), 
ANOTHER_VALUE));
+        } else {
+            assertEquals(ANOTHER_YIELD_RESULT, ((StatementResult) 
commandProcessingResult3).getAsInt());
+            assertTrue(leader.checkValueInStorage(TEST_KEY_2.bytes(), 
ANOTHER_VALUE_2));
+        }
     }
 
     private Node leader(RaftGroupService raftClient) {
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
index ab273a4d70..024087ba0b 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.metastorage.impl;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
 import static 
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.clusterService;
 import static 
org.apache.ignite.internal.testframework.flow.TestFlowUtils.subscribeToList;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
@@ -99,10 +100,12 @@ public class ItMetaStorageManagerImplTest extends 
IgniteAbstractTest {
 
     private MetaStorageManagerImpl metaStorageManager;
 
+    @InjectConfiguration
+    private RaftConfiguration raftConfiguration;
+
     @BeforeEach
     void setUp(
             TestInfo testInfo,
-            @InjectConfiguration RaftConfiguration raftConfiguration,
             @InjectConfiguration("mock.idleSyncTimeInterval = 100") 
MetaStorageConfiguration metaStorageConfiguration
     ) {
         var addr = new NetworkAddress("localhost", 10_000);
@@ -143,7 +146,9 @@ public class ItMetaStorageManagerImplTest extends 
IgniteAbstractTest {
                 clock,
                 topologyAwareRaftGroupServiceFactory,
                 new NoOpMetricManager(),
-                metaStorageConfiguration
+                metaStorageConfiguration,
+                raftConfiguration.retryTimeout(),
+                completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
         );
 
         assertThat(
@@ -230,7 +235,9 @@ public class ItMetaStorageManagerImplTest extends 
IgniteAbstractTest {
                 storage,
                 new HybridClockImpl(),
                 mock(TopologyAwareRaftGroupServiceFactory.class),
-                new NoOpMetricManager()
+                new NoOpMetricManager(),
+                raftConfiguration.retryTimeout(),
+                completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
         );
 
         assertThat(metaStorageManager.stopAsync(new ComponentContext()), 
willCompleteSuccessfully());
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
index 5e55468fb1..b565090691 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
@@ -18,7 +18,9 @@
 package org.apache.ignite.internal.metastorage.impl;
 
 import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toSet;
+import static 
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
@@ -202,7 +204,9 @@ public abstract class 
ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr
                     clock,
                     topologyAwareRaftGroupServiceFactory,
                     new NoOpMetricManager(),
-                    metaStorageConfiguration
+                    metaStorageConfiguration,
+                    raftConfiguration.retryTimeout(),
+                    completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
             );
 
             deployWatchesFut = metaStorageManager.deployWatches();
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
index d73d74e6b9..ac10b1c626 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.metastorage.impl;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 import java.nio.charset.StandardCharsets;
@@ -158,7 +160,12 @@ public class ItMetaStorageServicePersistenceTest extends 
ItAbstractListenerSnaps
             return s;
         });
 
-        return new MetaStorageListener(storage, new ClusterTimeImpl(nodeName, 
new IgniteSpinBusyLock(), new HybridClockImpl()));
+        return new MetaStorageListener(
+                storage,
+                new ClusterTimeImpl(nodeName, new IgniteSpinBusyLock(), new 
HybridClockImpl()),
+                raftConfiguration.retryTimeout(),
+                completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
+        );
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
index ab00dedab7..36332b02a9 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
@@ -17,8 +17,10 @@
 
 package org.apache.ignite.internal.metastorage.impl;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toUnmodifiableSet;
+import static 
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.and;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.or;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision;
@@ -186,8 +188,11 @@ public class ItMetaStorageServiceTest extends 
BaseIgniteAbstractTest {
 
         private MetaStorageService metaStorageService;
 
+        private RaftConfiguration raftConfiguration;
+
         Node(ClusterService clusterService, RaftConfiguration 
raftConfiguration, Path dataPath) {
             this.clusterService = clusterService;
+            this.raftConfiguration = raftConfiguration;
 
             HybridClock clock = new HybridClockImpl();
 
@@ -197,7 +202,6 @@ public class ItMetaStorageServiceTest extends 
BaseIgniteAbstractTest {
                     dataPath.resolve(name()),
                     clock
             );
-
             this.clusterTime = new ClusterTimeImpl(clusterService.nodeName(), 
new IgniteSpinBusyLock(), clock);
 
             this.mockStorage = mock(KeyValueStorage.class);
@@ -234,7 +238,12 @@ public class ItMetaStorageServiceTest extends 
BaseIgniteAbstractTest {
 
             assert peer != null;
 
-            var listener = new MetaStorageListener(mockStorage, clusterTime);
+            var listener = new MetaStorageListener(
+                    mockStorage,
+                    clusterTime,
+                    raftConfiguration.retryTimeout(),
+                    completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
+            );
 
             var raftNodeId = new RaftNodeId(MetastorageGroupId.INSTANCE, peer);
 
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
index 583d08488f..947f0869a9 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
@@ -17,8 +17,10 @@
 
 package org.apache.ignite.internal.metastorage.impl;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
+import static 
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
 import static 
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.findLocalAddresses;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
@@ -193,7 +195,9 @@ public class ItMetaStorageWatchTest extends 
IgniteAbstractTest {
                     clock,
                     topologyAwareRaftGroupServiceFactory,
                     new NoOpMetricManager(),
-                    metaStorageConfiguration
+                    metaStorageConfiguration,
+                    raftConfiguration.retryTimeout(),
+                    completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
             );
 
             components.add(metaStorageManager);
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java
index 27d09f80a5..50894e927e 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java
@@ -17,8 +17,10 @@
 
 package org.apache.ignite.internal.metastorage.server.raft;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.collectingAndThen;
 import static java.util.stream.Collectors.toSet;
+import static 
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
 import static 
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.findLocalAddresses;
 import static 
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.waitForTopology;
 import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults;
@@ -406,18 +408,45 @@ public class ItMetaStorageRaftGroupTest extends 
IgniteAbstractTest {
 
         var raftNodeId1 = new RaftNodeId(MetastorageGroupId.INSTANCE, 
membersConfiguration.peer(localMemberName(cluster.get(0))));
 
-        metaStorageRaftSrv1.startRaftNode(raftNodeId1, membersConfiguration,
-                new MetaStorageListener(mockStorage, 
mock(ClusterTimeImpl.class)), defaults());
+        metaStorageRaftSrv1.startRaftNode(
+                raftNodeId1,
+                membersConfiguration,
+                new MetaStorageListener(
+                        mockStorage,
+                        mock(ClusterTimeImpl.class),
+                        raftConfiguration.retryTimeout(),
+                        completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
+                ),
+                defaults()
+        );
 
         var raftNodeId2 = new RaftNodeId(MetastorageGroupId.INSTANCE, 
membersConfiguration.peer(localMemberName(cluster.get(1))));
 
-        metaStorageRaftSrv2.startRaftNode(raftNodeId2, membersConfiguration,
-                new MetaStorageListener(mockStorage, 
mock(ClusterTimeImpl.class)), defaults());
+        metaStorageRaftSrv2.startRaftNode(
+                raftNodeId2,
+                membersConfiguration,
+                new MetaStorageListener(
+                        mockStorage,
+                        mock(ClusterTimeImpl.class),
+                        raftConfiguration.retryTimeout(),
+                        completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
+                ),
+                defaults()
+        );
 
         var raftNodeId3 = new RaftNodeId(MetastorageGroupId.INSTANCE, 
membersConfiguration.peer(localMemberName(cluster.get(2))));
 
-        metaStorageRaftSrv3.startRaftNode(raftNodeId3, membersConfiguration,
-                new MetaStorageListener(mockStorage, 
mock(ClusterTimeImpl.class)), defaults());
+        metaStorageRaftSrv3.startRaftNode(
+                raftNodeId3,
+                membersConfiguration,
+                new MetaStorageListener(
+                        mockStorage,
+                        mock(ClusterTimeImpl.class),
+                        raftConfiguration.retryTimeout(),
+                        completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
+                ),
+                defaults()
+        );
 
         metaStorageRaftGrpSvc1 = 
waitForRaftGroupServiceSafely(RaftGroupServiceImpl.start(
                 MetastorageGroupId.INSTANCE,
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index cb63d3ddf9..01f87b37a5 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -30,6 +30,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.Flow.Subscriber;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.LongSupplier;
+import org.apache.ignite.configuration.ConfigurationValue;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
 import org.apache.ignite.internal.hlc.HybridClock;
@@ -130,6 +132,14 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
 
     private volatile MetaStorageConfiguration metaStorageConfiguration;
 
+    private final ConfigurationValue<Long> idempotentCacheTtl;
+
+    private final CompletableFuture<LongSupplier> maxClockSkewMillisFuture;
+
+    private volatile MetaStorageListener followerListener;
+
+    private volatile MetaStorageListener learnerListener;
+
     /**
      * The constructor.
      *
@@ -140,6 +150,7 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
      * @param storage Storage. This component owns this resource and will 
manage its lifecycle.
      * @param clock A hybrid logical clock.
      * @param metricManager Metric manager.
+     * @param maxClockSkewMillisFuture Future with maximum clock skew in 
milliseconds.
      */
     public MetaStorageManagerImpl(
             ClusterService clusterService,
@@ -149,7 +160,9 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
             KeyValueStorage storage,
             HybridClock clock,
             TopologyAwareRaftGroupServiceFactory 
topologyAwareRaftGroupServiceFactory,
-            MetricManager metricManager
+            MetricManager metricManager,
+            ConfigurationValue<Long> idempotentCacheTtl,
+            CompletableFuture<LongSupplier> maxClockSkewMillisFuture
     ) {
         this.clusterService = clusterService;
         this.raftMgr = raftMgr;
@@ -157,9 +170,11 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
         this.logicalTopologyService = logicalTopologyService;
         this.storage = storage;
         this.clusterTime = new ClusterTimeImpl(clusterService.nodeName(), 
busyLock, clock);
-        metaStorageMetricSource = new MetaStorageMetricSource(clusterTime);
+        this.metaStorageMetricSource = new 
MetaStorageMetricSource(clusterTime);
         this.topologyAwareRaftGroupServiceFactory = 
topologyAwareRaftGroupServiceFactory;
         this.metricManager = metricManager;
+        this.idempotentCacheTtl = idempotentCacheTtl;
+        this.maxClockSkewMillisFuture = maxClockSkewMillisFuture;
     }
 
     /**
@@ -175,9 +190,22 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
             HybridClock clock,
             TopologyAwareRaftGroupServiceFactory 
topologyAwareRaftGroupServiceFactory,
             MetricManager metricManager,
-            MetaStorageConfiguration configuration
+            MetaStorageConfiguration configuration,
+            ConfigurationValue<Long> idempotentCacheTtl,
+            CompletableFuture<LongSupplier> maxClockSkewMillisFuture
     ) {
-        this(clusterService, cmgMgr, logicalTopologyService, raftMgr, storage, 
clock, topologyAwareRaftGroupServiceFactory, metricManager);
+        this(
+                clusterService,
+                cmgMgr,
+                logicalTopologyService,
+                raftMgr,
+                storage,
+                clock,
+                topologyAwareRaftGroupServiceFactory,
+                metricManager,
+                idempotentCacheTtl,
+                maxClockSkewMillisFuture
+        );
 
         configure(configuration);
     }
@@ -293,10 +321,17 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
 
         assert localMetaStorageConfiguration != null : "Meta Storage 
configuration has not been set";
 
+        followerListener = new MetaStorageListener(
+                storage,
+                clusterTime,
+                idempotentCacheTtl,
+                maxClockSkewMillisFuture
+        );
+
         CompletableFuture<TopologyAwareRaftGroupService> raftServiceFuture = 
raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture(
                 new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
                 configuration,
-                new MetaStorageListener(storage, clusterTime),
+                followerListener,
                 RaftGroupEventsListener.noopLsnr,
                 disruptorConfig,
                 topologyAwareRaftGroupServiceFactory
@@ -334,10 +369,17 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
 
         assert localPeer != null;
 
+        learnerListener = new MetaStorageListener(
+                storage,
+                clusterTime,
+                idempotentCacheTtl,
+                maxClockSkewMillisFuture
+        );
+
         return raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture(
                 new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
                 configuration,
-                new MetaStorageListener(storage, clusterTime),
+                learnerListener,
                 RaftGroupEventsListener.noopLsnr,
                 disruptorConfig
         );
@@ -845,4 +887,19 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
     public CompletableFuture<Void> notifyRevisionUpdateListenerOnStart() {
         return 
recoveryFinishedFuture.thenCompose(storage::notifyRevisionUpdateListenerOnStart);
     }
+
+    /**
+     * Removes obsolete entries from both volatile and persistent idempotent 
command cache.
+     */
+    @TestOnly
+    @Deprecated(forRemoval = true)
+    // TODO: https://issues.apache.org/jira/browse/IGNITE-19417 cache eviction 
should be triggered by MS GC instead.
+    public void evictIdempotentCommandsCache() {
+        if (followerListener != null) {
+            followerListener.evictIdempotentCommandsCache();
+        }
+        if (learnerListener != null) {
+            learnerListener.evictIdempotentCommandsCache();
+        }
+    }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
index a49e594983..163bd2dcad 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
@@ -24,7 +24,11 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
+import java.util.function.LongSupplier;
+import org.apache.ignite.configuration.ConfigurationValue;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.command.GetAllCommand;
@@ -44,6 +48,7 @@ import org.apache.ignite.internal.raft.service.CommandClosure;
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
 import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
 
 /**
  * Meta storage listener.
@@ -60,9 +65,19 @@ public class MetaStorageListener implements 
RaftGroupListener, BeforeApplyHandle
      *
      * @param storage Storage.
      */
-    public MetaStorageListener(KeyValueStorage storage, ClusterTimeImpl 
clusterTime) {
+    public MetaStorageListener(
+            KeyValueStorage storage,
+            ClusterTimeImpl clusterTime,
+            ConfigurationValue<Long> idempotentCacheTtl,
+            CompletableFuture<LongSupplier> maxClockSkewMillisFuture
+    ) {
         this.storage = storage;
-        this.writeHandler = new MetaStorageWriteHandler(storage, clusterTime);
+        this.writeHandler = new MetaStorageWriteHandler(
+                storage,
+                clusterTime,
+                idempotentCacheTtl,
+                maxClockSkewMillisFuture
+        );
     }
 
     @Override
@@ -174,4 +189,14 @@ public class MetaStorageListener implements 
RaftGroupListener, BeforeApplyHandle
     @Override
     public void onShutdown() {
     }
+
+    /**
+     * Removes obsolete entries from both volatile and persistent idempotent 
command cache.
+     */
+    @TestOnly
+    @Deprecated(forRemoval = true)
+    // TODO: https://issues.apache.org/jira/browse/IGNITE-19417 cache eviction 
should be triggered by MS GC instead.
+    public void evictIdempotentCommandsCache() {
+        writeHandler.evictIdempotentCommandsCache(HybridTimestamp.MIN_VALUE);
+    }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
index 95c3685fb1..33bc88b658 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
@@ -18,13 +18,18 @@
 package org.apache.ignite.internal.metastorage.server.raft;
 
 import static java.util.Arrays.copyOfRange;
+import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.internal.util.ByteUtils.byteToBoolean;
 
 import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.LongSupplier;
+import org.apache.ignite.configuration.ConfigurationValue;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -61,6 +66,7 @@ import 
org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.internal.raft.service.CommandClosure;
+import org.apache.ignite.internal.util.ArrayUtils;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
@@ -81,9 +87,20 @@ public class MetaStorageWriteHandler {
 
     private final Map<CommandId, IdempotentCommandCachedResult> 
idempotentCommandCache = new ConcurrentHashMap<>();
 
-    MetaStorageWriteHandler(KeyValueStorage storage, ClusterTimeImpl 
clusterTime) {
+    private final ConfigurationValue<Long> idempotentCacheTtl;
+
+    private final CompletableFuture<LongSupplier> maxClockSkewMillisFuture;
+
+    MetaStorageWriteHandler(
+            KeyValueStorage storage,
+            ClusterTimeImpl clusterTime,
+            ConfigurationValue<Long> idempotentCacheTtl,
+            CompletableFuture<LongSupplier> maxClockSkewMillisFuture
+    ) {
         this.storage = storage;
         this.clusterTime = clusterTime;
+        this.idempotentCacheTtl = idempotentCacheTtl;
+        this.maxClockSkewMillisFuture = maxClockSkewMillisFuture;
     }
 
     /**
@@ -95,7 +112,14 @@ public class MetaStorageWriteHandler {
         CommandClosure<WriteCommand> resultClosure;
 
         if (command instanceof IdempotentCommand) {
-            CommandId commandId = ((IdempotentCommand) command).id();
+            IdempotentCommand idempotentCommand = ((IdempotentCommand) 
command);
+            CommandId commandId = idempotentCommand.id();
+
+            // TODO: https://issues.apache.org/jira/browse/IGNITE-19417 Remove.
+            if (idempotentCommand.safeTime().getPhysical() % 100 == 0) {
+                evictIdempotentCommandsCache(((IdempotentCommand) 
command).safeTime());
+            }
+
             IdempotentCommandCachedResult cachedResult = 
idempotentCommandCache.get(commandId);
 
             if (cachedResult != null) {
@@ -352,6 +376,39 @@ public class MetaStorageWriteHandler {
         }
     }
 
+    /**
+     * Removes obsolete entries from both volatile and persistent idempotent 
command cache.
+     *
+     * @param safeTime Trigger operation safe time. TODO: 
https://issues.apache.org/jira/browse/IGNITE-19417 Remove.
+     */
+    // TODO: https://issues.apache.org/jira/browse/IGNITE-19417 Call on meta 
storage compaction.
+    void evictIdempotentCommandsCache(HybridTimestamp safeTime) {
+        HybridTimestamp cleanupTimestamp = clusterTime.now();
+        LOG.info("Idempotent command cache cleanup started 
[cleanupTimestamp={}].", cleanupTimestamp);
+
+        maxClockSkewMillisFuture.thenAccept(maxClockSkewMillis -> {
+            List<CommandId> commandIdsToRemove = 
idempotentCommandCache.entrySet().stream()
+                    .filter(entry -> 
entry.getValue().commandStartTime.getPhysical()
+                            <= cleanupTimestamp.getPhysical() - 
(idempotentCacheTtl.value() + maxClockSkewMillis.getAsLong()))
+                    .map(entry -> entry.getKey())
+                    .collect(toList());
+
+            if (!commandIdsToRemove.isEmpty()) {
+                List<byte[]> commandIdStorageKeys = commandIdsToRemove.stream()
+                        .map(commandId -> ArrayUtils.concat(new byte[]{}, 
ByteUtils.toBytes(commandId)))
+                        .collect(toList());
+
+                storage.removeAll(commandIdStorageKeys, safeTime);
+
+                
commandIdsToRemove.forEach(idempotentCommandCache.keySet()::remove);
+            }
+
+            LOG.info("Idempotent command cache cleanup finished 
[cleanupTimestamp={}, cleanupCompletionTimestamp={},"
+                            + " removedEntriesCount={}, cacheSize={}].", 
cleanupTimestamp, clusterTime.now(), commandIdsToRemove.size(),
+                    idempotentCommandCache.size());
+        });
+    }
+
     private static class IdempotentCommandCachedResult {
         @Nullable
         final Serializable result;
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/IdempotentCommandCacheTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/IdempotentCommandCacheTest.java
index 968e129d8c..c1cdd33bfd 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/IdempotentCommandCacheTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/IdempotentCommandCacheTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.metastorage.impl;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
@@ -32,6 +34,8 @@ import java.nio.charset.StandardCharsets;
 import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.lang.ByteArray;
@@ -47,22 +51,27 @@ import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStora
 import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
 import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
 import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.raft.service.CommandClosure;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 /**
  * Tests for idempotency of {@link 
org.apache.ignite.internal.metastorage.command.IdempotentCommand}.
  */
-public class IdempotentCommandCacheTest {
+@ExtendWith(ConfigurationExtension.class)
+public class IdempotentCommandCacheTest extends BaseIgniteAbstractTest {
     private static final String NODE_NAME = "node";
 
     private static final MetaStorageCommandsFactory CMD_FACTORY = new 
MetaStorageCommandsFactory();
 
-    private final KeyValueStorage storage;
+    private KeyValueStorage storage;
 
-    private final MetaStorageListener metaStorageListener;
+    private MetaStorageListener metaStorageListener;
 
     private final HybridClock clock = new HybridClockImpl();
 
@@ -71,14 +80,17 @@ public class IdempotentCommandCacheTest {
 
     private final CommandIdGenerator commandIdGenerator = new 
CommandIdGenerator(() -> UUID.randomUUID().toString());
 
-    /**
-     * Constructor.
-     */
-    public IdempotentCommandCacheTest() {
+    @InjectConfiguration
+    private RaftConfiguration raftConfiguration;
+
+    @BeforeEach
+    public void setUp() {
         storage = new SimpleInMemoryKeyValueStorage(NODE_NAME);
         metaStorageListener = new MetaStorageListener(
                 storage,
-                new ClusterTimeImpl(NODE_NAME, new IgniteSpinBusyLock(), clock)
+                new ClusterTimeImpl(NODE_NAME, new IgniteSpinBusyLock(), 
clock),
+                raftConfiguration.retryTimeout(),
+                completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
         );
     }
 
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
index d6fc9c0431..c548f0d734 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.metastorage.impl;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -43,6 +44,7 @@ import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.raft.RaftManager;
 import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
 import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -56,6 +58,9 @@ public class MetaStorageDeployWatchesCorrectnessTest extends 
IgniteAbstractTest
     @InjectConfiguration
     private static MetaStorageConfiguration metaStorageConfiguration;
 
+    @InjectConfiguration
+    private static RaftConfiguration raftConfiguration;
+
     /**
      * Returns a stream with test arguments.
      *
@@ -87,7 +92,9 @@ public class MetaStorageDeployWatchesCorrectnessTest extends 
IgniteAbstractTest
                         clock,
                         mock(TopologyAwareRaftGroupServiceFactory.class),
                         new NoOpMetricManager(),
-                        metaStorageConfiguration
+                        metaStorageConfiguration,
+                        raftConfiguration.retryTimeout(),
+                        completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
                 ),
                 StandaloneMetaStorageManager.create()
         );
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
index a8b86c57b2..ac65131cac 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.metastorage.impl;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -48,6 +49,7 @@ import org.apache.ignite.internal.network.MessagingService;
 import 
org.apache.ignite.internal.network.serialization.MessageSerializationRegistry;
 import org.apache.ignite.internal.raft.RaftManager;
 import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.network.NodeMetadata;
@@ -65,6 +67,9 @@ public class MetaStorageManagerRecoveryTest extends 
BaseIgniteAbstractTest {
     @InjectConfiguration
     private static MetaStorageConfiguration metaStorageConfiguration;
 
+    @InjectConfiguration
+    private static RaftConfiguration raftConfiguration;
+
     private MetaStorageManagerImpl metaStorageManager;
 
     private KeyValueStorage kvs;
@@ -89,7 +94,9 @@ public class MetaStorageManagerRecoveryTest extends 
BaseIgniteAbstractTest {
                 clock,
                 mock(TopologyAwareRaftGroupServiceFactory.class),
                 new NoOpMetricManager(),
-                metaStorageConfiguration
+                metaStorageConfiguration,
+                raftConfiguration.retryTimeout(),
+                completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
         );
     }
 
diff --git 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
index c9b02c071a..1e70463ad3 100644
--- 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
+++ 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.metastorage.impl;
 
 import static java.util.Collections.singleton;
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -28,6 +29,7 @@ import java.io.Serializable;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.LongSupplier;
 import org.apache.ignite.configuration.ConfigurationValue;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
@@ -46,6 +48,7 @@ import org.apache.ignite.internal.raft.ReadCommand;
 import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
 import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.raft.service.BeforeApplyHandler;
 import org.apache.ignite.internal.raft.service.CommandClosure;
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
@@ -105,7 +108,9 @@ public class StandaloneMetaStorageManager extends 
MetaStorageManagerImpl {
                 keyValueStorage,
                 mock(TopologyAwareRaftGroupServiceFactory.class),
                 mockConfiguration(),
-                clock
+                clock,
+                mockRaftConfiguration().retryTimeout(),
+                completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
         );
     }
 
@@ -126,7 +131,9 @@ public class StandaloneMetaStorageManager extends 
MetaStorageManagerImpl {
             KeyValueStorage storage,
             TopologyAwareRaftGroupServiceFactory raftServiceFactory,
             MetaStorageConfiguration configuration,
-            HybridClock clock
+            HybridClock clock,
+            ConfigurationValue<Long> idempotentCacheTtl,
+            CompletableFuture<LongSupplier> maxClockSkewMillisFuture
     ) {
         super(
                 clusterService,
@@ -137,7 +144,9 @@ public class StandaloneMetaStorageManager extends 
MetaStorageManagerImpl {
                 clock,
                 raftServiceFactory,
                 new NoOpMetricManager(),
-                configuration
+                configuration,
+                idempotentCacheTtl,
+                maxClockSkewMillisFuture
         );
     }
 
@@ -211,6 +220,10 @@ public class StandaloneMetaStorageManager extends 
MetaStorageManagerImpl {
         return configuration;
     }
 
+    private static RaftConfiguration mockRaftConfiguration() {
+        return mock(RaftConfiguration.class, LENIENT_SETTINGS);
+    }
+
     private static CompletableFuture<Serializable> runCommand(Command command, 
RaftGroupListener listener) {
         CompletableFuture<Serializable> future = new CompletableFuture<>();
 
diff --git 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
index ad403899f3..2282eb86f5 100644
--- 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
+++ 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
@@ -43,6 +43,7 @@ import java.util.stream.IntStream;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.TestClockService;
@@ -254,6 +255,8 @@ public class MultiActorPlacementDriverTest extends 
BasePlacementDriverTest {
 
             var storage = new SimpleInMemoryKeyValueStorage(nodeName);
 
+            ClockService clockService = new TestClockService(nodeClock);
+
             var metaStorageManager = new MetaStorageManagerImpl(
                     clusterService,
                     cmgManager,
@@ -263,7 +266,9 @@ public class MultiActorPlacementDriverTest extends 
BasePlacementDriverTest {
                     nodeClock,
                     topologyAwareRaftGroupServiceFactory,
                     new NoOpMetricManager(),
-                    metaStorageConfiguration
+                    metaStorageConfiguration,
+                    raftConfiguration.retryTimeout(),
+                    completedFuture(clockService::maxClockSkewMillis)
             );
 
             if (this.metaStorageManager == null) {
@@ -279,7 +284,7 @@ public class MultiActorPlacementDriverTest extends 
BasePlacementDriverTest {
                     logicalTopologyService,
                     raftManager,
                     topologyAwareRaftGroupServiceFactory,
-                    new TestClockService(nodeClock)
+                    clockService
             );
 
             res.add(new Node(nodeName, clusterService, raftManager, 
metaStorageManager, placementDriverManager));
diff --git 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
index b7cda99359..f586f89e0b 100644
--- 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
+++ 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
@@ -63,6 +63,7 @@ import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopolog
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -191,6 +192,8 @@ public class PlacementDriverManagerTest extends 
BasePlacementDriverTest {
 
         var storage = new SimpleInMemoryKeyValueStorage(nodeName);
 
+        ClockService clockService = new TestClockService(nodeClock);
+
         metaStorageManager = new MetaStorageManagerImpl(
                 clusterService,
                 cmgManager,
@@ -200,7 +203,9 @@ public class PlacementDriverManagerTest extends 
BasePlacementDriverTest {
                 nodeClock,
                 topologyAwareRaftGroupServiceFactory,
                 new NoOpMetricManager(),
-                metaStorageConfiguration
+                metaStorageConfiguration,
+                raftConfiguration.retryTimeout(),
+                completedFuture(clockService::maxClockSkewMillis)
         );
 
         placementDriverManager = new PlacementDriverManager(
@@ -212,7 +217,7 @@ public class PlacementDriverManagerTest extends 
BasePlacementDriverTest {
                 logicalTopologyService,
                 raftManager,
                 topologyAwareRaftGroupServiceFactory,
-                new TestClockService(nodeClock)
+                clockService
         );
 
         ComponentContext componentContext = new ComponentContext();
diff --git 
a/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java
 
b/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java
index 01e8aebd08..2913279d95 100644
--- 
a/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java
+++ 
b/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java
@@ -107,7 +107,7 @@ public abstract class ItAbstractListenerSnapshotTest<T 
extends RaftGroupListener
     private ScheduledExecutorService executor;
 
     @InjectConfiguration
-    private RaftConfiguration raftConfiguration;
+    protected RaftConfiguration raftConfiguration;
 
     /**
      * Create executor for raft group services.
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
index 4471754bff..ee6af33877 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
@@ -18,7 +18,9 @@
 package org.apache.ignite.internal.configuration;
 
 import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toUnmodifiableList;
+import static 
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -209,7 +211,9 @@ public class ItDistributedConfigurationPropertiesTest 
extends BaseIgniteAbstract
                     clock,
                     topologyAwareRaftGroupServiceFactory,
                     new NoOpMetricManager(),
-                    metaStorageConfiguration
+                    metaStorageConfiguration,
+                    raftConfiguration.retryTimeout(),
+                    completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
             );
 
             deployWatchesFut = metaStorageManager.deployWatches();
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
index e6cc4ba817..04cdc3d132 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.configuration.storage;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -181,7 +183,9 @@ public class ItDistributedConfigurationStorageTest extends 
BaseIgniteAbstractTes
                     clock,
                     topologyAwareRaftGroupServiceFactory,
                     new NoOpMetricManager(),
-                    metaStorageConfiguration
+                    metaStorageConfiguration,
+                    raftConfiguration.retryTimeout(),
+                    completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
             );
 
             deployWatchesFut = metaStorageManager.deployWatches();
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 80051692c6..e85b913e65 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -417,6 +417,8 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
 
         InvokeInterceptor metaStorageInvokeInterceptor = 
metaStorageInvokeInterceptorByNode.get(idx);
 
+        CompletableFuture<LongSupplier> maxClockSkewFuture = new 
CompletableFuture<>();
+
         var metaStorageMgr = new MetaStorageManagerImpl(
                 clusterSvc,
                 cmgManager,
@@ -426,7 +428,9 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 hybridClock,
                 topologyAwareRaftGroupServiceFactory,
                 metricManager,
-                metaStorageConfiguration
+                metaStorageConfiguration,
+                raftConfiguration.retryTimeout(),
+                maxClockSkewFuture
         ) {
             @Override
             public CompletableFuture<Boolean> invoke(Condition condition, 
Collection<Operation> success, Collection<Operation> failure) {
@@ -464,12 +468,15 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
         SchemaSynchronizationConfiguration schemaSyncConfiguration = 
clusterConfigRegistry.getConfiguration(
                 SchemaSynchronizationConfiguration.KEY
         );
+
         ClockService clockService = new ClockServiceImpl(
                 hybridClock,
                 clockWaiter,
                 () -> schemaSyncConfiguration.maxClockSkew().value()
         );
 
+        maxClockSkewFuture.complete(clockService::maxClockSkewMillis);
+
         var placementDriverManager = new PlacementDriverManager(
                 name,
                 metaStorageMgr,
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index e966a158e3..80b612e8c0 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -576,6 +576,8 @@ public class IgniteImpl implements Ignite {
                 raftGroupEventsClientListener
         );
 
+        CompletableFuture<LongSupplier> maxClockSkewMillisFuture = new 
CompletableFuture<>();
+
         metaStorageMgr = new MetaStorageManagerImpl(
                 clusterSvc,
                 cmgMgr,
@@ -584,7 +586,9 @@ public class IgniteImpl implements Ignite {
                 new RocksDbKeyValueStorage(name, 
workDir.resolve(METASTORAGE_DB_PATH), failureProcessor),
                 clock,
                 topologyAwareRaftGroupServiceFactory,
-                metricManager
+                metricManager,
+                raftConfiguration.retryTimeout(),
+                maxClockSkewMillisFuture
         );
 
         this.cfgStorage = new DistributedConfigurationStorage(name, 
metaStorageMgr);
@@ -606,6 +610,8 @@ public class IgniteImpl implements Ignite {
 
         clockService = new ClockServiceImpl(clock, clockWaiter, new 
SameValueLongSupplier(() -> schemaSyncConfig.maxClockSkew().value()));
 
+        maxClockSkewMillisFuture.complete(clockService::maxClockSkewMillis);
+
         Consumer<LongFunction<CompletableFuture<?>>> registry = c -> 
metaStorageMgr.registerRevisionUpdateListener(c::apply);
 
         placementDriverMgr = new PlacementDriverManager(
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 9306da8ddd..71afc3cbad 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.rebalance;
 
 import static java.util.Collections.reverse;
 import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.toSet;
@@ -33,6 +34,7 @@ import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUt
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.plannedPartAssignmentsKey;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.TestIgnitionManager.DEFAULT_MAX_CLOCK_SKEW_MS;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
@@ -1101,7 +1103,9 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     hybridClock,
                     topologyAwareRaftGroupServiceFactory,
                     metricManager,
-                    metaStorageConfiguration
+                    metaStorageConfiguration,
+                    raftConfiguration.retryTimeout(),
+                    completedFuture(() -> DEFAULT_MAX_CLOCK_SKEW_MS)
             );
 
             var placementDriver = new TestPlacementDriver(() -> 
PRIMARY_FILTER.apply(clusterService.topologyService().allMembers()));
@@ -1125,7 +1129,7 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
             ClockService clockService = new ClockServiceImpl(
                     hybridClock,
                     clockWaiter,
-                    () -> TestIgnitionManager.DEFAULT_MAX_CLOCK_SKEW_MS
+                    () -> DEFAULT_MAX_CLOCK_SKEW_MS
             );
 
             TransactionInflights transactionInflights = new 
TransactionInflights(placementDriver, clockService);

Reply via email to