This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new eeac526827 IGNITE-22819 Fix Metastorage revisions inconsistency (#4143)
eeac526827 is described below

commit eeac5268276251dcaadc04d23c54821a6c4d9806
Author: Alexander Lapin <lapin1...@gmail.com>
AuthorDate: Fri Aug 2 15:02:58 2024 +0300

    IGNITE-22819 Fix Metastorage revisions inconsistency (#4143)
---
 .../org/apache/ignite/internal/util/ByteUtils.java |  20 ++
 .../DistributionZoneRebalanceEngineTest.java       |  13 +-
 .../RebalanceUtilUpdateAssignmentsTest.java        |  14 +-
 modules/jacoco-report/build.gradle                 |   1 +
 modules/metastorage-cache/README.md                |  10 +
 modules/metastorage-cache/build.gradle             |  33 +++
 .../cache/IdempotentCacheVacuumizer.java           | 165 +++++++++++++++
 .../cache/IdempotentCacheVacuumizerTest.java       | 225 +++++++++++++++++++++
 .../impl/ItIdempotentCommandCacheTest.java         |  11 +-
 .../impl/ItMetaStorageManagerImplTest.java         |   9 +-
 .../ItMetaStorageMultipleNodesAbstractTest.java    |   6 +-
 .../impl/ItMetaStorageServicePersistenceTest.java  |   9 +-
 .../metastorage/impl/ItMetaStorageServiceTest.java |  12 +-
 .../metastorage/impl/ItMetaStorageWatchTest.java   |   6 +-
 .../server/raft/ItMetaStorageRaftGroupTest.java    |  25 +--
 .../EvictIdempotentCommandsCacheCommand.java       |  30 +++
 .../command/MetastorageCommandsMessageGroup.java   |   3 +
 .../impl/MetaStorageLeaderElectionListener.java    |  11 +-
 .../metastorage/impl/MetaStorageManagerImpl.java   |  70 ++-----
 .../metastorage/impl/MetaStorageServiceImpl.java   |  32 +++
 .../metastorage/server/KeyValueStorage.java        |   8 +
 .../server/persistence/RocksDbKeyValueStorage.java |  39 ++--
 .../server/raft/MetaStorageListener.java           |  23 +--
 .../server/raft/MetaStorageWriteHandler.java       |  93 ++++-----
 .../impl/IdempotentCommandCacheTest.java           |  14 +-
 .../MetaStorageDeployWatchesCorrectnessTest.java   |   9 +-
 .../impl/MetaStorageManagerRecoveryTest.java       |   9 +-
 .../server/BasicOperationsKeyValueStorageTest.java |  72 ++++++-
 .../impl/StandaloneMetaStorageManager.java         |  14 +-
 .../server/SimpleInMemoryKeyValueStorage.java      |  21 +-
 .../replicator/ItReplicaLifecycleTest.java         |   8 +-
 .../MultiActorPlacementDriverTest.java             |   4 +-
 .../PlacementDriverManagerTest.java                |   4 +-
 .../service/ItAbstractListenerSnapshotTest.java    |   2 +-
 modules/runner/build.gradle                        |   2 +
 .../ItDistributedConfigurationPropertiesTest.java  |   6 +-
 .../ItDistributedConfigurationStorageTest.java     |   6 +-
 .../runner/app/ItIgniteNodeRestartTest.java        |   4 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |  26 ++-
 .../rebalance/ItRebalanceDistributedTest.java      |   5 +-
 settings.gradle                                    |   2 +
 41 files changed, 756 insertions(+), 320 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java
index 8efda24489..86563608ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java
@@ -268,6 +268,26 @@ public class ByteUtils {
         }
     }
 
+    /**
+     * Deserializes an object from byte array using native java serialization 
mechanism.
+     *
+     * @param bytes Byte array.
+     * @param from – the offset in the buffer of the first byte to read.
+     * @param length – the maximum number of bytes to read from the buffer.
+     * @return Object.
+     */
+    // TODO https://issues.apache.org/jira/browse/IGNITE-22894 Extend test 
coverage.
+    public static <T> T fromBytes(byte[] bytes, int from, int length) {
+        try (
+                var bis = new ByteArrayInputStream(bytes, from, length);
+                var in = new ObjectInputStream(bis)
+        ) {
+            return (T) in.readObject();
+        } catch (IOException | ClassNotFoundException e) {
+            throw new IgniteInternalException("Could not deserialize an 
object", e);
+        }
+    }
+
     /**
      * Converts a string to a byte array using {@link StandardCharsets#UTF_8}, 
{@code null} if {@code s} is {@code null}.
      *
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
index b08c23f7d6..baf31c1b7f 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
@@ -30,7 +30,6 @@ import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
 import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
-import static 
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
 import static org.apache.ignite.internal.table.TableTestUtils.getTableIdStrict;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -70,7 +69,6 @@ import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.commands.ColumnParams;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
-import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
 import org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil;
 import org.apache.ignite.internal.distributionzones.Node;
@@ -99,7 +97,6 @@ import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.WriteCommand;
-import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.raft.service.CommandClosure;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.TablePartitionId;
@@ -140,9 +137,6 @@ public class DistributionZoneRebalanceEngineTest extends 
IgniteAbstractTest {
 
     private CatalogManager catalogManager;
 
-    @InjectConfiguration
-    private RaftConfiguration raftConfiguration;
-
     @BeforeEach
     public void setUp() {
         String nodeName = "test";
@@ -184,12 +178,7 @@ public class DistributionZoneRebalanceEngineTest extends 
IgniteAbstractTest {
 
         ClusterTimeImpl clusterTime = new ClusterTimeImpl("node", new 
IgniteSpinBusyLock(), clock);
 
-        MetaStorageListener metaStorageListener = new MetaStorageListener(
-                keyValueStorage,
-                clusterTime,
-                raftConfiguration.retryTimeout(),
-                completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
-        );
+        MetaStorageListener metaStorageListener = new 
MetaStorageListener(keyValueStorage, clusterTime);
 
         RaftGroupService metaStorageService = mock(RaftGroupService.class);
 
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
index e87547b909..0fd9dc6979 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
@@ -17,13 +17,11 @@
 
 package org.apache.ignite.internal.distributionzones.rebalance;
 
-import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toSet;
 import static 
org.apache.ignite.internal.affinity.AffinityUtils.calculateAssignmentForPartition;
 import static org.apache.ignite.internal.affinity.Assignments.toBytes;
 import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
 import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
-import static 
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -46,7 +44,6 @@ import org.apache.ignite.internal.affinity.Assignments;
 import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
-import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -66,7 +63,6 @@ import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.WriteCommand;
-import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.raft.service.CommandClosure;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.TablePartitionId;
@@ -110,9 +106,6 @@ public class RebalanceUtilUpdateAssignmentsTest extends 
IgniteAbstractTest {
 
     private final HybridClock clock = new HybridClockImpl();
 
-    @InjectConfiguration
-    private RaftConfiguration raftConfiguration;
-
     private static final int partNum = 2;
     private static final int replicas = 2;
 
@@ -138,12 +131,7 @@ public class RebalanceUtilUpdateAssignmentsTest extends 
IgniteAbstractTest {
 
         ClusterTimeImpl clusterTime = new ClusterTimeImpl("node", new 
IgniteSpinBusyLock(), clock);
 
-        MetaStorageListener metaStorageListener = new MetaStorageListener(
-                keyValueStorage,
-                clusterTime,
-                raftConfiguration.retryTimeout(),
-                completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
-        );
+        MetaStorageListener metaStorageListener = new 
MetaStorageListener(keyValueStorage, clusterTime);
 
         RaftGroupService metaStorageService = mock(RaftGroupService.class);
 
diff --git a/modules/jacoco-report/build.gradle 
b/modules/jacoco-report/build.gradle
index 5260664c55..4ff0312858 100644
--- a/modules/jacoco-report/build.gradle
+++ b/modules/jacoco-report/build.gradle
@@ -31,6 +31,7 @@ dependencies {
     implementation project(':ignite-core')
     implementation project(':ignite-metastorage-api')
     implementation project(':ignite-metastorage')
+    implementation project(':ignite-metastorage-cache')
     implementation project(':ignite-network')
     implementation project(':ignite-network-api')
     implementation project(':ignite-raft')
diff --git a/modules/metastorage-cache/README.md 
b/modules/metastorage-cache/README.md
new file mode 100644
index 0000000000..0983a81056
--- /dev/null
+++ b/modules/metastorage-cache/README.md
@@ -0,0 +1,10 @@
+# Metastorage idempotent command cache eviction module
+
+Module responsible for metastorage idempotent command cache eviction.
+
+Metastorage idempotent command cache is a mapping of commandId -> command 
evaluation result that is used to store the results of
+the invoke and multi-invoke commands in order not to re-evaluate invoke 
condition in case of operation retry. By the definition it's
+necessary to store such results for the command-processing-timeout + max clock 
skew, thus after given interval corresponding cached command
+may re evicted. IdempotentCacheVacuumizer is an actor to trigger such 
evictions. It would be reasonable to put it inside metastorage module
+itself instead of creating new one, however it's not possible because of 
cyclic dependency. IdempotentCacheVacuumizer requires maxClockSkew
+that is stored in distributed configuration, that on it's turn requires 
metastorage. That's why the new module was introduced.
\ No newline at end of file
diff --git a/modules/metastorage-cache/build.gradle 
b/modules/metastorage-cache/build.gradle
new file mode 100644
index 0000000000..a4ac189796
--- /dev/null
+++ b/modules/metastorage-cache/build.gradle
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply from: "$rootDir/buildscripts/java-core.gradle"
+apply from: "$rootDir/buildscripts/publishing.gradle"
+apply from: "$rootDir/buildscripts/java-junit5.gradle"
+
+description = 'ignite-metastorage-cache'
+
+dependencies {
+    implementation project(':ignite-core')
+    implementation project(':ignite-metastorage')
+    implementation libs.jetbrains.annotations
+
+    testImplementation testFixtures(project(":ignite-core"))
+    testImplementation libs.mockito.junit
+    testImplementation libs.mockito.core
+    testImplementation libs.hamcrest.core
+}
diff --git 
a/modules/metastorage-cache/src/main/java/org/apache/ignite/internal/metastorage/cache/IdempotentCacheVacuumizer.java
 
b/modules/metastorage-cache/src/main/java/org/apache/ignite/internal/metastorage/cache/IdempotentCacheVacuumizer.java
new file mode 100644
index 0000000000..bad5415364
--- /dev/null
+++ 
b/modules/metastorage-cache/src/main/java/org/apache/ignite/internal/metastorage/cache/IdempotentCacheVacuumizer.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.cache;
+
+import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import org.apache.ignite.configuration.ConfigurationValue;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.metastorage.impl.ElectionListener;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * Scheduler wrapper that triggers idempotent cache vacuumization with an 
ability to suspend and resume the triggering. It is valid but not
+ * effective to have multiple vacuumizers at the same time, meaning that 
best-effort uniqueness is preferable. In order to achieve such
+ * best-effort uniqueness it's possible to use meta storage leader 
collocation: start/resume triggering on leader election if the leader is
+ * collocated with a local node, and suspend upon loss of collocation with the 
leader.
+ * In case of exception within vacuumization action, vacuumizer will just log 
a warning without suspending the scheduler.
+ */
+public class IdempotentCacheVacuumizer implements IgniteComponent, 
ElectionListener {
+    private static final IgniteLogger LOG = 
Loggers.forClass(IdempotentCacheVacuumizer.class);
+
+    private final AtomicBoolean triggerVacuumization;
+
+    private final String nodeName;
+
+    /** Scheduler to run vacuumization actions. */
+    private final ScheduledExecutorService scheduler;
+
+    /** Action that will trigger vacuumization process. */
+    private final Consumer<HybridTimestamp> vacuumizationAction;
+
+    /** Idempotent cache ttl. */
+    private final ConfigurationValue<Long> idempotentCacheTtl;
+
+    /** Clock service. */
+    private final ClockService clockService;
+
+    /** The time to delay first execution. */
+    private final long initialDelay;
+
+    /** The delay between the termination of one execution and the 
commencement of the next. */
+    private final long delay;
+
+    /** The time unit of the initialDelay and delay parameters. */
+    private final TimeUnit unit;
+
+    /** Vacuumization task future. */
+    private volatile ScheduledFuture<?> scheduledFuture;
+
+    /**
+     * The constructor.
+     *
+     * @param nodeName Node name.
+     * @param scheduler Scheduler to run vacuumization actions.
+     * @param vacuumizationAction Action that will trigger vacuumization 
process.
+     * @param idempotentCacheTtl Idempotent cache ttl.
+     * @param clockService Clock service.
+     * @param initialDelay The time to delay first execution.
+     * @param delay The delay between the termination of one execution and the 
commencement of the next.
+     * @param unit The time unit of the initialDelay and delay parameters.
+     */
+    public IdempotentCacheVacuumizer(
+            String nodeName,
+            ScheduledExecutorService scheduler,
+            Consumer<HybridTimestamp> vacuumizationAction,
+            ConfigurationValue<Long> idempotentCacheTtl,
+            ClockService clockService,
+            long initialDelay,
+            long delay,
+            TimeUnit unit
+    ) {
+        this.nodeName = nodeName;
+        this.triggerVacuumization = new AtomicBoolean(false);
+        this.scheduler = scheduler;
+        this.vacuumizationAction = vacuumizationAction;
+        this.idempotentCacheTtl = idempotentCacheTtl;
+        this.clockService = clockService;
+        this.initialDelay = initialDelay;
+        this.delay = delay;
+        this.unit = unit;
+    }
+
+    @Override
+    public CompletableFuture<Void> startAsync(ComponentContext 
componentContext) {
+        scheduledFuture = scheduler.scheduleWithFixedDelay(
+                () -> {
+                    if (triggerVacuumization.get()) {
+                        try {
+                            
vacuumizationAction.accept(hybridTimestamp(clockService.nowLong()
+                                    - (idempotentCacheTtl.value() + 
clockService.maxClockSkewMillis())));
+                        } catch (Exception e) {
+                            LOG.warn("An exception occurred while executing 
idempotent cache vacuumization action."
+                                    + " Idempotent cache vacuumizer won't be 
stopped.", e);
+                        }
+                    }
+                },
+                initialDelay,
+                delay,
+                unit
+        );
+
+        return nullCompletedFuture();
+    }
+
+    @Override
+    public CompletableFuture<Void> stopAsync(ComponentContext 
componentContext) {
+        if (scheduledFuture != null) {
+            scheduledFuture.cancel(false);
+        }
+
+        return nullCompletedFuture();
+    }
+
+    @Override
+    public void onLeaderElected(ClusterNode newLeader) {
+        if (newLeader.name().equals(nodeName)) {
+            startLocalVacuumizationTriggering();
+        } else {
+            suspendLocalVacuumizationTriggering();
+        }
+    }
+
+    /**
+     * Starts local vacuumization triggering. Will take no effect if 
vacuumizer was previously stopped.
+     */
+    void startLocalVacuumizationTriggering() {
+        triggerVacuumization.set(true);
+        LOG.info("Idempotent cache vacuumizer started.");
+    }
+
+    /**
+     * Suspends further local vacuumization triggering. Will take no effect if 
vacuumizer was previously stopped.
+     */
+    void suspendLocalVacuumizationTriggering() {
+        triggerVacuumization.set(false);
+        LOG.info("Idempotent cache vacuumizer suspended.");
+    }
+}
diff --git 
a/modules/metastorage-cache/src/test/java/org/apache/ignite/internal/metastorage/cache/IdempotentCacheVacuumizerTest.java
 
b/modules/metastorage-cache/src/test/java/org/apache/ignite/internal/metastorage/cache/IdempotentCacheVacuumizerTest.java
new file mode 100644
index 0000000000..31c29ef938
--- /dev/null
+++ 
b/modules/metastorage-cache/src/test/java/org/apache/ignite/internal/metastorage/cache/IdempotentCacheVacuumizerTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.cache;
+
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.withSettings;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import org.apache.ignite.configuration.ConfigurationValue;
+import org.apache.ignite.internal.TestHybridClock;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.hlc.TestClockService;
+import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockSettings;
+import org.mockito.quality.Strictness;
+
+/**
+ * Tests for idempotency of {@link IdempotentCacheVacuumizer}.
+ */
+public class IdempotentCacheVacuumizerTest extends BaseIgniteAbstractTest {
+    private static final int TOUCH_COUNTER_CHANGE_TIMEOUT_MILLIS = 1_000;
+
+    private static final MockSettings LENIENT_SETTINGS = 
withSettings().strictness(Strictness.LENIENT);
+
+    private ScheduledExecutorService scheduler;
+
+    private ClockService clocService;
+
+    private ConfigurationValue<Long> idempotentCacheTtlConfigurationValue;
+
+    private IdempotentCacheVacuumizer vacuumizer;
+
+    @BeforeEach
+    public void setup() {
+        scheduler = Executors.newSingleThreadScheduledExecutor();
+        clocService = new TestClockService(new TestHybridClock(() -> 1L));
+
+        idempotentCacheTtlConfigurationValue = mock(ConfigurationValue.class, 
LENIENT_SETTINGS);
+        when(idempotentCacheTtlConfigurationValue.value()).thenReturn(0L);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        if (vacuumizer != null) {
+            vacuumizer.beforeNodeStop();
+            assertThat(vacuumizer.stopAsync(), willCompleteSuccessfully());
+        }
+
+        scheduler.shutdown();
+    }
+
+    /**
+     * Check that IdempotentCacheVacuumizer triggers vacuumization action.
+     * <ol>
+     *     <li>Ensure that until starting, vacuumizer will not trigger the 
vacuumization action.</li>
+     *     <li>Start vacuumization triggering and verify that vacuumization 
action was called.</li>
+     *     <li>Suspend vacuumization triggering and verify that vacuumization 
action calls were suspended.</li>
+     *     <li>Start vacuumization triggering and verify that vacuumization 
action was called.</li>
+     * </ol>
+     *
+     * @throws Exception if Thread.sleep() was interrupted.
+     */
+    @Test
+    public void testIdempotentCacheVacuumizer() throws Exception {
+        AtomicInteger touchCounter = new AtomicInteger(0);
+
+        IdempotentCacheVacuumizer vacuumizer = new IdempotentCacheVacuumizer(
+                "Node1",
+                scheduler,
+                ignored -> touchCounter.incrementAndGet(),
+                idempotentCacheTtlConfigurationValue,
+                clocService,
+                0,
+                1,
+                TimeUnit.MILLISECONDS
+        );
+
+        assertThat(vacuumizer.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
+
+        // Ensure that until starting, vacuumizer will not trigger the 
vacuumization action. It's a best-effort check.
+        Thread.sleep(10);
+        assertEquals(0, touchCounter.get());
+
+        // Start vacuumization triggering and verify that vacuumization action 
was called.
+        vacuumizer.startLocalVacuumizationTriggering();
+        assertTrue(waitForCondition(
+                () -> touchCounter.get() > 0,
+                TOUCH_COUNTER_CHANGE_TIMEOUT_MILLIS)
+        );
+
+        // Suspend vacuumization triggering and verify that vacuumization 
action calls were suspended.
+        vacuumizer.suspendLocalVacuumizationTriggering();
+        int touchCounterAfterStopTriggered = touchCounter.get();
+        assertTrue(waitForCondition(
+                () -> touchCounter.get() == touchCounterAfterStopTriggered || 
touchCounter.get() == touchCounterAfterStopTriggered + 1,
+                TOUCH_COUNTER_CHANGE_TIMEOUT_MILLIS)
+        );
+
+        // Start vacuumization triggering and verify that vacuumization action 
was called.
+        vacuumizer.startLocalVacuumizationTriggering();
+        assertTrue(waitForCondition(
+                () -> touchCounter.get() > touchCounterAfterStopTriggered + 1,
+                TOUCH_COUNTER_CHANGE_TIMEOUT_MILLIS)
+        );
+    }
+
+    /**
+     * Check that IdempotentCacheVacuumizer doesn't trigger vacuumization 
action after shutdown.
+     * <ol>
+     *     <li>Start vacuumization triggering and verify that vacuumization 
action was called.</li>
+     *     <li>Shutdown the vacuumizer scheduler and check that action calls 
were stopped.</li>
+     *     <li>Start the vacuumizer and check that it doesn't take any 
effect.</li>
+     *     <li>Suspend vacuumization triggering and check that it doesn't take 
any effect.</li>
+     * </ol>
+     *
+     * @throws Exception if Thread.sleep() was interrupted.
+     */
+    @Test
+    public void testIdempotentCacheVacuumizerAfterShutdown() throws Exception {
+        AtomicInteger touchCounter = new AtomicInteger(0);
+
+        IdempotentCacheVacuumizer vacuumizer = new IdempotentCacheVacuumizer(
+                "Node1",
+                scheduler,
+                ignored -> touchCounter.incrementAndGet(),
+                idempotentCacheTtlConfigurationValue,
+                clocService,
+                0,
+                1,
+                TimeUnit.MILLISECONDS
+        );
+
+        assertThat(vacuumizer.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
+
+        // Start vacuumization triggering and verify that vacuumization action 
was called.
+        vacuumizer.startLocalVacuumizationTriggering();
+        assertTrue(waitForCondition(
+                () -> touchCounter.get() > 0,
+                TOUCH_COUNTER_CHANGE_TIMEOUT_MILLIS)
+        );
+
+        // Shutdown the vacuumizer scheduler and check that action calls were 
stopped.
+        scheduler.shutdown();
+        int touchCounterAfterShutdown = touchCounter.get();
+        Thread.sleep(10);
+        assertTrue(touchCounter.get() == touchCounterAfterShutdown || 
touchCounter.get() == touchCounterAfterShutdown + 1);
+
+        // Start the vacuumizer and check that it doesn't take any effect.
+        vacuumizer.startLocalVacuumizationTriggering();
+        Thread.sleep(10);
+        assertTrue(touchCounter.get() == touchCounterAfterShutdown || 
touchCounter.get() == touchCounterAfterShutdown + 1);
+
+        // Suspend vacuumization triggering and check that it doesn't take any 
effect.
+        vacuumizer.suspendLocalVacuumizationTriggering();
+        Thread.sleep(10);
+        assertTrue(touchCounter.get() == touchCounterAfterShutdown || 
touchCounter.get() == touchCounterAfterShutdown + 1);
+    }
+
+    /**
+     * Check that IdempotentCacheVacuumizer doesn't stops on exception in 
vacuumization action doesn't re-throw it to the outer environment
+     * but logs an exception with WARN.
+     *
+     * @throws Exception if Thread.sleep() was interrupted.
+     */
+    @Test
+    public void testIdempotentCacheExceptionHandling() throws Exception {
+        AtomicInteger touchCounter = new AtomicInteger(0);
+
+        Consumer<HybridTimestamp> vacuumizationActionStub = ignored -> {
+            touchCounter.incrementAndGet();
+            throw new IllegalStateException();
+        };
+
+        IdempotentCacheVacuumizer vacuumizer = new IdempotentCacheVacuumizer(
+                "Node1",
+                scheduler,
+                vacuumizationActionStub,
+                idempotentCacheTtlConfigurationValue,
+                clocService,
+                0,
+                1,
+                TimeUnit.MILLISECONDS
+        );
+
+        assertThat(vacuumizer.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
+
+        // Start vacuumization triggering and verify that vacuumization 
actions were not stopped after exception.
+        vacuumizer.startLocalVacuumizationTriggering();
+
+        assertTrue(waitForCondition(
+                () -> touchCounter.get() > 1,
+                TOUCH_COUNTER_CHANGE_TIMEOUT_MILLIS)
+        );
+    }
+}
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
index da3cf1b836..7c5b0d18bb 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.metastorage.impl;
 
-import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toSet;
 import static 
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
@@ -196,9 +195,7 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
                     clock,
                     topologyAwareRaftGroupServiceFactory,
                     new NoOpMetricManager(),
-                    metaStorageConfiguration,
-                    raftConfiguration.retryTimeout(),
-                    completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
+                    metaStorageConfiguration
             );
 
             clockWaiter = new ClockWaiter(clusterService.nodeName(), clock);
@@ -415,10 +412,10 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
             ), willCompleteSuccessfully());
         }
 
-        for (Node node : nodes) {
-            node.metaStorageManager.evictIdempotentCommandsCache();
-        }
+        HybridTimestamp evictionTimestamp = 
HybridTimestamp.hybridTimestamp(nodes.get(0).clockService.nowLong()
+                - (raftConfiguration.retryTimeout().value() + 
nodes.get(0).clockService.maxClockSkewMillis()));
 
+        
assertThat(nodes.get(0).metaStorageManager.evictIdempotentCommandsCache(evictionTimestamp),
 willCompleteSuccessfully());
 
         // Run same idempotent command one more time and check that condition 
**was** re-evaluated and not retrieved from the cache.
         CompletableFuture<Object> commandProcessingResultFuture3 = 
raftClient().run(idempotentCommand);
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
index 8275484bf8..3c8a05542c 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.metastorage.impl;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toList;
-import static 
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
 import static 
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.clusterService;
 import static 
org.apache.ignite.internal.testframework.flow.TestFlowUtils.subscribeToList;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
@@ -146,9 +145,7 @@ public class ItMetaStorageManagerImplTest extends 
IgniteAbstractTest {
                 clock,
                 topologyAwareRaftGroupServiceFactory,
                 new NoOpMetricManager(),
-                metaStorageConfiguration,
-                raftConfiguration.retryTimeout(),
-                completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
+                metaStorageConfiguration
         );
 
         assertThat(
@@ -235,9 +232,7 @@ public class ItMetaStorageManagerImplTest extends 
IgniteAbstractTest {
                 storage,
                 new HybridClockImpl(),
                 mock(TopologyAwareRaftGroupServiceFactory.class),
-                new NoOpMetricManager(),
-                raftConfiguration.retryTimeout(),
-                completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
+                new NoOpMetricManager()
         );
 
         assertThat(metaStorageManager.stopAsync(new ComponentContext()), 
willCompleteSuccessfully());
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
index d6b4941299..dd5d620b4b 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
@@ -18,9 +18,7 @@
 package org.apache.ignite.internal.metastorage.impl;
 
 import static java.util.concurrent.CompletableFuture.allOf;
-import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toSet;
-import static 
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
@@ -205,9 +203,7 @@ public abstract class 
ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr
                     clock,
                     topologyAwareRaftGroupServiceFactory,
                     new NoOpMetricManager(),
-                    metaStorageConfiguration,
-                    raftConfiguration.retryTimeout(),
-                    completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
+                    metaStorageConfiguration
             );
 
             deployWatchesFut = metaStorageManager.deployWatches();
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
index 366a103993..ccf4eaffd0 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
@@ -17,8 +17,6 @@
 
 package org.apache.ignite.internal.metastorage.impl;
 
-import static java.util.concurrent.CompletableFuture.completedFuture;
-import static 
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 import java.nio.charset.StandardCharsets;
@@ -160,12 +158,7 @@ public class ItMetaStorageServicePersistenceTest extends 
ItAbstractListenerSnaps
             return s;
         });
 
-        return new MetaStorageListener(
-                storage,
-                new ClusterTimeImpl(nodeName, new IgniteSpinBusyLock(), new 
HybridClockImpl()),
-                raftConfiguration.retryTimeout(),
-                completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
-        );
+        return new MetaStorageListener(storage, new ClusterTimeImpl(nodeName, 
new IgniteSpinBusyLock(), new HybridClockImpl()));
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
index 3ab7ebf264..559e617ee7 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
@@ -17,10 +17,8 @@
 
 package org.apache.ignite.internal.metastorage.impl;
 
-import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toUnmodifiableSet;
-import static 
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.and;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.or;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision;
@@ -189,11 +187,8 @@ public class ItMetaStorageServiceTest extends 
BaseIgniteAbstractTest {
 
         private MetaStorageService metaStorageService;
 
-        private RaftConfiguration raftConfiguration;
-
         Node(ClusterService clusterService, RaftConfiguration 
raftConfiguration, Path dataPath) {
             this.clusterService = clusterService;
-            this.raftConfiguration = raftConfiguration;
 
             HybridClock clock = new HybridClockImpl();
 
@@ -239,12 +234,7 @@ public class ItMetaStorageServiceTest extends 
BaseIgniteAbstractTest {
 
             assert peer != null;
 
-            var listener = new MetaStorageListener(
-                    mockStorage,
-                    clusterTime,
-                    raftConfiguration.retryTimeout(),
-                    completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
-            );
+            var listener = new MetaStorageListener(mockStorage, clusterTime);
 
             var raftNodeId = new RaftNodeId(MetastorageGroupId.INSTANCE, peer);
 
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
index 8bcaf65ecb..05b7c8f1b5 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
@@ -17,10 +17,8 @@
 
 package org.apache.ignite.internal.metastorage.impl;
 
-import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
-import static 
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
 import static 
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.findLocalAddresses;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
@@ -195,9 +193,7 @@ public class ItMetaStorageWatchTest extends 
IgniteAbstractTest {
                     clock,
                     topologyAwareRaftGroupServiceFactory,
                     new NoOpMetricManager(),
-                    metaStorageConfiguration,
-                    raftConfiguration.retryTimeout(),
-                    completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
+                    metaStorageConfiguration
             );
 
             components.add(metaStorageManager);
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java
index 50894e927e..e1ce03451d 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java
@@ -17,10 +17,8 @@
 
 package org.apache.ignite.internal.metastorage.server.raft;
 
-import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.collectingAndThen;
 import static java.util.stream.Collectors.toSet;
-import static 
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
 import static 
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.findLocalAddresses;
 import static 
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.waitForTopology;
 import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults;
@@ -83,6 +81,7 @@ import org.apache.ignite.raft.jraft.option.NodeOptions;
 import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -221,6 +220,7 @@ public class ItMetaStorageRaftGroupTest extends 
IgniteAbstractTest {
      * @throws Exception If failed.
      */
     @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-22891";)
     public void testRangeNextWorksCorrectlyAfterLeaderChange() throws 
Exception {
         AtomicInteger replicatorStartedCounter = new AtomicInteger(0);
 
@@ -411,12 +411,7 @@ public class ItMetaStorageRaftGroupTest extends 
IgniteAbstractTest {
         metaStorageRaftSrv1.startRaftNode(
                 raftNodeId1,
                 membersConfiguration,
-                new MetaStorageListener(
-                        mockStorage,
-                        mock(ClusterTimeImpl.class),
-                        raftConfiguration.retryTimeout(),
-                        completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
-                ),
+                new MetaStorageListener(mockStorage, 
mock(ClusterTimeImpl.class)),
                 defaults()
         );
 
@@ -425,12 +420,7 @@ public class ItMetaStorageRaftGroupTest extends 
IgniteAbstractTest {
         metaStorageRaftSrv2.startRaftNode(
                 raftNodeId2,
                 membersConfiguration,
-                new MetaStorageListener(
-                        mockStorage,
-                        mock(ClusterTimeImpl.class),
-                        raftConfiguration.retryTimeout(),
-                        completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
-                ),
+                new MetaStorageListener(mockStorage, 
mock(ClusterTimeImpl.class)),
                 defaults()
         );
 
@@ -439,12 +429,7 @@ public class ItMetaStorageRaftGroupTest extends 
IgniteAbstractTest {
         metaStorageRaftSrv3.startRaftNode(
                 raftNodeId3,
                 membersConfiguration,
-                new MetaStorageListener(
-                        mockStorage,
-                        mock(ClusterTimeImpl.class),
-                        raftConfiguration.retryTimeout(),
-                        completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
-                ),
+                new MetaStorageListener(mockStorage, 
mock(ClusterTimeImpl.class)),
                 defaults()
         );
 
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/EvictIdempotentCommandsCacheCommand.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/EvictIdempotentCommandsCacheCommand.java
new file mode 100644
index 0000000000..21d6bfbacd
--- /dev/null
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/EvictIdempotentCommandsCacheCommand.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.command;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.network.annotations.Transferable;
+
+/**
+ * Removes obsolete entries from both volatile and persistent idempotent 
command cache.
+ */
+@Transferable(MetastorageCommandsMessageGroup.EVICT_IDEMPOTENT_COMMAND_CACHE)
+public interface EvictIdempotentCommandsCacheCommand extends 
MetaStorageWriteCommand {
+    /** Cached entries older than given timestamp will be evicted. */
+    HybridTimestamp evictionTimestamp();
+}
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java
index 0967b84d5c..819d02717d 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java
@@ -61,4 +61,7 @@ public interface MetastorageCommandsMessageGroup {
 
     /** Message type for {@link SyncTimeCommand}. */
     short SYNC_TIME = 70;
+
+    /** Message type for {@link EvictIdempotentCommandsCacheCommand}. */
+    short EVICT_IDEMPOTENT_COMMAND_CACHE = 71;
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLeaderElectionListener.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLeaderElectionListener.java
index 016f6c0038..4a493c228f 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLeaderElectionListener.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLeaderElectionListener.java
@@ -118,11 +118,12 @@ public class MetaStorageLeaderElectionListener implements 
LeaderElectionListener
                 
logicalTopologyService.addEventListener(logicalTopologyEventListener);
 
                 metaStorageSvcFut
-                        .thenAcceptBoth(metaStorageConfigurationFuture, 
(service, metaStorageConfiguration) ->
-                                clusterTime.startSafeTimeScheduler(
-                                        safeTime -> service.syncTime(safeTime, 
term),
-                                        metaStorageConfiguration
-                                ))
+                        .thenAcceptBoth(metaStorageConfigurationFuture, 
(service, metaStorageConfiguration) -> {
+                            clusterTime.startSafeTimeScheduler(
+                                    safeTime -> service.syncTime(safeTime, 
term),
+                                    metaStorageConfiguration
+                            );
+                        })
                         .whenComplete((v, e) -> {
                             if (e != null) {
                                 LOG.error("Unable to start Idle Safe Time 
scheduler", e);
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 4a388c45fa..30ac53b403 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.metastorage.impl;
 
 import static java.util.concurrent.CompletableFuture.failedFuture;
-import static java.util.concurrent.TimeUnit.MINUTES;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.IgniteUtils.cancelOrConsume;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
@@ -29,13 +28,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.Flow.Subscriber;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.LongSupplier;
-import org.apache.ignite.configuration.ConfigurationValue;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
 import org.apache.ignite.internal.hlc.HybridClock;
@@ -72,7 +67,6 @@ import org.apache.ignite.internal.raft.RaftNodeId;
 import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
 import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
-import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -137,17 +131,10 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
 
     private volatile MetaStorageConfiguration metaStorageConfiguration;
 
-    private final ConfigurationValue<Long> idempotentCacheTtl;
-
-    private final CompletableFuture<LongSupplier> maxClockSkewMillisFuture;
-
     private volatile MetaStorageListener followerListener;
 
     private volatile MetaStorageListener learnerListener;
 
-    // TODO: https://issues.apache.org/jira/browse/IGNITE-19417 Remove, cache 
eviction should be triggered by MS GC instead.
-    private final ScheduledExecutorService idempotentCacheVacumizer;
-
     private final List<ElectionListener> electionListeners = new 
CopyOnWriteArrayList<>(); 
 
     /**
@@ -160,7 +147,6 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
      * @param storage Storage. This component owns this resource and will 
manage its lifecycle.
      * @param clock A hybrid logical clock.
      * @param metricManager Metric manager.
-     * @param maxClockSkewMillisFuture Future with maximum clock skew in 
milliseconds.
      */
     public MetaStorageManagerImpl(
             ClusterService clusterService,
@@ -170,9 +156,7 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
             KeyValueStorage storage,
             HybridClock clock,
             TopologyAwareRaftGroupServiceFactory 
topologyAwareRaftGroupServiceFactory,
-            MetricManager metricManager,
-            ConfigurationValue<Long> idempotentCacheTtl,
-            CompletableFuture<LongSupplier> maxClockSkewMillisFuture
+            MetricManager metricManager
     ) {
         this.clusterService = clusterService;
         this.raftMgr = raftMgr;
@@ -183,10 +167,6 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
         this.metaStorageMetricSource = new 
MetaStorageMetricSource(clusterTime);
         this.topologyAwareRaftGroupServiceFactory = 
topologyAwareRaftGroupServiceFactory;
         this.metricManager = metricManager;
-        this.idempotentCacheTtl = idempotentCacheTtl;
-        this.maxClockSkewMillisFuture = maxClockSkewMillisFuture;
-        this.idempotentCacheVacumizer = 
Executors.newSingleThreadScheduledExecutor(
-                NamedThreadFactory.create(clusterService.nodeName(), 
"idempotent-cache-vacumizer", LOG));
     }
 
     /**
@@ -202,9 +182,7 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
             HybridClock clock,
             TopologyAwareRaftGroupServiceFactory 
topologyAwareRaftGroupServiceFactory,
             MetricManager metricManager,
-            MetaStorageConfiguration configuration,
-            ConfigurationValue<Long> idempotentCacheTtl,
-            CompletableFuture<LongSupplier> maxClockSkewMillisFuture
+            MetaStorageConfiguration configuration
     ) {
         this(
                 clusterService,
@@ -214,9 +192,7 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
                 storage,
                 clock,
                 topologyAwareRaftGroupServiceFactory,
-                metricManager,
-                idempotentCacheTtl,
-                maxClockSkewMillisFuture
+                metricManager
         );
 
         configure(configuration);
@@ -338,12 +314,7 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
 
         assert localMetaStorageConfiguration != null : "Meta Storage 
configuration has not been set";
 
-        followerListener = new MetaStorageListener(
-                storage,
-                clusterTime,
-                idempotentCacheTtl,
-                maxClockSkewMillisFuture
-        );
+        followerListener = new MetaStorageListener(storage, clusterTime);
 
         CompletableFuture<TopologyAwareRaftGroupService> raftServiceFuture = 
raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture(
                 new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
@@ -387,12 +358,7 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
 
         assert localPeer != null;
 
-        learnerListener = new MetaStorageListener(
-                storage,
-                clusterTime,
-                idempotentCacheTtl,
-                maxClockSkewMillisFuture
-        );
+        learnerListener = new MetaStorageListener(storage, clusterTime);
 
         return raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture(
                 new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
@@ -457,8 +423,6 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
 
         busyLock.block();
 
-        idempotentCacheVacumizer.shutdownNow();
-
         deployWatchesFuture.cancel(true);
 
         recoveryFinishedFuture.cancel(true);
@@ -523,8 +487,6 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
                                 
MetaStorageManagerImpl.this.onRevisionApplied(revision);
                             }
                         });
-
-                        
idempotentCacheVacumizer.scheduleWithFixedDelay(this::evictIdempotentCommandsCache,
 1, 1, MINUTES);
                     }))
                     .whenComplete((v, e) -> {
                         if (e == null) {
@@ -912,16 +874,20 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
     }
 
     /**
-     * Removes obsolete entries from both volatile and persistent idempotent 
command cache.
+     * Removes obsolete entries from both volatile and persistent idempotent 
command cache older than evictionTimestamp.
+     *
+     * @param evictionTimestamp Cached entries older than given timestamp will 
be evicted.
+     * @return Pending operation future.
      */
-    @Deprecated(forRemoval = true)
-    // TODO: https://issues.apache.org/jira/browse/IGNITE-19417 cache eviction 
should be triggered by MS GC instead.
-    public void evictIdempotentCommandsCache() {
-        if (followerListener != null) {
-            followerListener.evictIdempotentCommandsCache();
-        }
-        if (learnerListener != null) {
-            learnerListener.evictIdempotentCommandsCache();
+    public CompletableFuture<Void> 
evictIdempotentCommandsCache(HybridTimestamp evictionTimestamp) {
+        if (!busyLock.enterBusy()) {
+            return failedFuture(new NodeStoppingException());
+        }
+
+        try {
+            return metaStorageSvcFut.thenCompose(svc -> 
svc.evictIdempotentCommandsCache(evictionTimestamp));
+        } finally {
+            busyLock.leaveBusy();
         }
     }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
index 86b323f4e8..3025895273 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import 
org.apache.ignite.internal.metastorage.command.EvictIdempotentCommandsCacheCommand;
 import org.apache.ignite.internal.metastorage.command.GetAllCommand;
 import org.apache.ignite.internal.metastorage.command.GetCommand;
 import 
org.apache.ignite.internal.metastorage.command.GetCurrentRevisionCommand;
@@ -268,6 +269,22 @@ public class MetaStorageServiceImpl implements 
MetaStorageService {
         return context.raftService().run(cmd);
     }
 
+    /**
+     * Removes obsolete entries from both volatile and persistent idempotent 
command cache.
+     *
+     * @param evictionTimestamp Cached entries older than given timestamp will 
be evicted.
+     * @return Pending operation future.
+     */
+    CompletableFuture<Void> evictIdempotentCommandsCache(HybridTimestamp 
evictionTimestamp) {
+        EvictIdempotentCommandsCacheCommand 
evictIdempotentCommandsCacheCommand = evictIdempotentCommandsCacheCommand(
+                context.commandsFactory(),
+                evictionTimestamp,
+                clusterTime.now()
+        );
+
+        return context.raftService().run(evictIdempotentCommandsCacheCommand);
+    }
+
     @Override
     public void close() {
         context.close();
@@ -332,4 +349,19 @@ public class MetaStorageServiceImpl implements 
MetaStorageService {
 
         return 
commandsFactory.removeAllCommand().keys(list).initiatorTime(ts).build();
     }
+
+    /**
+     * Creates evict idempotent commands cache command.
+     *
+     * @param commandsFactory Commands factory.
+     * @param evictionTimestamp Cached entries older than given timestamp will 
be evicted.
+     * @param ts Local time.
+     */
+    private EvictIdempotentCommandsCacheCommand 
evictIdempotentCommandsCacheCommand(
+            MetaStorageCommandsFactory commandsFactory,
+            HybridTimestamp evictionTimestamp,
+            HybridTimestamp ts
+    ) {
+        return 
commandsFactory.evictIdempotentCommandsCacheCommand().evictionTimestamp(evictionTimestamp).initiatorTime(ts).build();
+    }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index 3a9ca2a993..3159069f15 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -270,6 +270,14 @@ public interface KeyValueStorage extends ManuallyCloseable 
{
      */
     HybridTimestamp timestampByRevision(long revision);
 
+    /**
+     * Looks a revision lesser or equal to the timestamp.
+     *
+     * @param timestamp Timestamp by which to do a lookup.
+     * @return Revision lesser or equal to the timestamp or -1 if there is no 
such revision.
+     */
+    long revisionByTimestamp(HybridTimestamp timestamp);
+
     /**
      * Sets the revision listener. This is needed only for the recovery, after 
that listener must be set to {@code null}.
      * {@code null} means that we no longer must be notified of revision 
updates for recovery, because recovery is finished.
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index d58efd4256..713a85951f 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -977,23 +977,12 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
 
 
         try (WriteBatch batch = new WriteBatch()) {
-            byte[] tsBytes = hybridTsToArray(lowWatermark);
-            long maxRevision;
-
             // Find a revision with timestamp lesser or equal to the watermark.
-            try (RocksIterator rocksIterator = tsToRevision.newIterator()) {
-                rocksIterator.seekForPrev(tsBytes);
-
-                RocksUtils.checkIterator(rocksIterator);
-
-                byte[] tsValue = rocksIterator.value();
-
-                if (tsValue.length == 0) {
-                    // Nothing to compact yet.
-                    return;
-                }
+            long maxRevision = revisionByTimestamp(lowWatermark);
 
-                maxRevision = bytesToLong(tsValue);
+            if (maxRevision == -1) {
+                // Nothing to compact yet.
+                return;
             }
 
             try (RocksIterator iterator = index.newIterator()) {
@@ -1520,6 +1509,26 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
         }
     }
 
+    @Override
+    public long revisionByTimestamp(HybridTimestamp timestamp) {
+        byte[] tsBytes = hybridTsToArray(timestamp);
+
+        // Find a revision with timestamp lesser or equal to the watermark.
+        try (RocksIterator rocksIterator = tsToRevision.newIterator()) {
+            rocksIterator.seekForPrev(tsBytes);
+
+            RocksUtils.checkIterator(rocksIterator);
+
+            byte[] tsValue = rocksIterator.value();
+
+            if (tsValue.length == 0) {
+                return -1;
+            }
+
+            return bytesToLong(tsValue);
+        }
+    }
+
     private void finishReplay() {
         // Take the lock to drain the event cache and prevent new events from 
being cached. Since event notification is asynchronous,
         // this lock shouldn't be held for long.
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
index 5659e37227..8e4780e58b 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
@@ -26,10 +26,7 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
-import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
-import java.util.function.LongSupplier;
-import org.apache.ignite.configuration.ConfigurationValue;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.command.GetAllCommand;
@@ -67,17 +64,10 @@ public class MetaStorageListener implements 
RaftGroupListener, BeforeApplyHandle
      */
     public MetaStorageListener(
             KeyValueStorage storage,
-            ClusterTimeImpl clusterTime,
-            ConfigurationValue<Long> idempotentCacheTtl,
-            CompletableFuture<LongSupplier> maxClockSkewMillisFuture
+            ClusterTimeImpl clusterTime
     ) {
         this.storage = storage;
-        this.writeHandler = new MetaStorageWriteHandler(
-                storage,
-                clusterTime,
-                idempotentCacheTtl,
-                maxClockSkewMillisFuture
-        );
+        this.writeHandler = new MetaStorageWriteHandler(storage, clusterTime);
     }
 
     @Override
@@ -191,13 +181,4 @@ public class MetaStorageListener implements 
RaftGroupListener, BeforeApplyHandle
     @Override
     public void onShutdown() {
     }
-
-    /**
-     * Removes obsolete entries from both volatile and persistent idempotent 
command cache.
-     */
-    @Deprecated(forRemoval = true)
-    // TODO: https://issues.apache.org/jira/browse/IGNITE-19417 cache eviction 
should be triggered by MS GC instead.
-    public void evictIdempotentCommandsCache() {
-        writeHandler.evictIdempotentCommandsCache();
-    }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
index bc226520f3..e63b29fff5 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
@@ -28,17 +28,15 @@ import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.LongSupplier;
-import org.apache.ignite.configuration.ConfigurationValue;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.CommandId;
 import org.apache.ignite.internal.metastorage.Entry;
+import 
org.apache.ignite.internal.metastorage.command.EvictIdempotentCommandsCacheCommand;
 import org.apache.ignite.internal.metastorage.command.IdempotentCommand;
 import org.apache.ignite.internal.metastorage.command.InvokeCommand;
 import org.apache.ignite.internal.metastorage.command.MetaStorageWriteCommand;
@@ -69,7 +67,6 @@ import 
org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.internal.raft.service.CommandClosure;
-import org.apache.ignite.internal.util.ArrayUtils;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
@@ -88,22 +85,14 @@ public class MetaStorageWriteHandler {
     private final KeyValueStorage storage;
     private final ClusterTimeImpl clusterTime;
 
-    private final Map<CommandId, IdempotentCommandCachedResult> 
idempotentCommandCache = new ConcurrentHashMap<>();
-
-    private final ConfigurationValue<Long> idempotentCacheTtl;
-
-    private final CompletableFuture<LongSupplier> maxClockSkewMillisFuture;
+    private final Map<CommandId, @Nullable Serializable> 
idempotentCommandCache = new ConcurrentHashMap<>();
 
     MetaStorageWriteHandler(
             KeyValueStorage storage,
-            ClusterTimeImpl clusterTime,
-            ConfigurationValue<Long> idempotentCacheTtl,
-            CompletableFuture<LongSupplier> maxClockSkewMillisFuture
+            ClusterTimeImpl clusterTime
     ) {
         this.storage = storage;
         this.clusterTime = clusterTime;
-        this.idempotentCacheTtl = idempotentCacheTtl;
-        this.maxClockSkewMillisFuture = maxClockSkewMillisFuture;
     }
 
     /**
@@ -118,10 +107,10 @@ public class MetaStorageWriteHandler {
             IdempotentCommand idempotentCommand = ((IdempotentCommand) 
command);
             CommandId commandId = idempotentCommand.id();
 
-            IdempotentCommandCachedResult cachedResult = 
idempotentCommandCache.get(commandId);
+            Serializable cachedResult = idempotentCommandCache.get(commandId);
 
             if (cachedResult != null) {
-                clo.result(cachedResult.result);
+                clo.result(cachedResult);
 
                 return;
             } else {
@@ -215,6 +204,11 @@ public class MetaStorageWriteHandler {
         } else if (command instanceof SyncTimeCommand) {
             storage.advanceSafeTime(command.safeTime());
 
+            clo.result(null);
+        } else if (command instanceof EvictIdempotentCommandsCacheCommand) {
+            EvictIdempotentCommandsCacheCommand cmd = 
(EvictIdempotentCommandsCacheCommand) command;
+            evictIdempotentCommandsCache(cmd.evictionTimestamp(), opTime);
+
             clo.result(null);
         }
     }
@@ -352,8 +346,6 @@ public class MetaStorageWriteHandler {
         byte[] keyTo = storage.nextKey(IDEMPOTENT_COMMAND_PREFIX_BYTES);
 
         Cursor<Entry> cursor = storage.range(keyFrom, keyTo);
-        // It's fine to lose original command start time - in that case we 
will store the entry a little bit longer that necessary.
-        HybridTimestamp now = clusterTime.now();
 
         try (cursor) {
             for (Entry entry : cursor) {
@@ -368,7 +360,7 @@ public class MetaStorageWriteHandler {
                         result = 
MSG_FACTORY.statementResult().result(ByteBuffer.wrap(entry.value())).build();
                     }
 
-                    idempotentCommandCache.put(commandId, new 
IdempotentCommandCachedResult(result, now));
+                    idempotentCommandCache.put(commandId, result);
                 }
             }
         }
@@ -376,48 +368,45 @@ public class MetaStorageWriteHandler {
 
     /**
      * Removes obsolete entries from both volatile and persistent idempotent 
command cache.
+     *
+     * @param evictionTimestamp Cached entries older than given timestamp will 
be evicted.
+     * @param operationTimestamp Command operation timestamp.
      */
-    // TODO: https://issues.apache.org/jira/browse/IGNITE-19417 Call on meta 
storage compaction.
-    void evictIdempotentCommandsCache() {
-        HybridTimestamp cleanupTimestamp = clusterTime.now();
-        LOG.info("Idempotent command cache cleanup started 
[cleanupTimestamp={}].", cleanupTimestamp);
-
-        maxClockSkewMillisFuture.thenAccept(maxClockSkewMillis -> {
-            List<CommandId> commandIdsToRemove = 
idempotentCommandCache.entrySet().stream()
-                    .filter(entry -> 
entry.getValue().commandStartTime.getPhysical()
-                            <= cleanupTimestamp.getPhysical() - 
(idempotentCacheTtl.value() + maxClockSkewMillis.getAsLong()))
-                    .map(Map.Entry::getKey)
+    void evictIdempotentCommandsCache(HybridTimestamp evictionTimestamp, 
HybridTimestamp operationTimestamp) {
+        LOG.info("Idempotent command cache cleanup started 
[evictionTimestamp={}].", evictionTimestamp);
+
+        long obsoleteRevision = storage.revisionByTimestamp(evictionTimestamp);
+
+        if (obsoleteRevision != -1) {
+            byte[] keyFrom = IDEMPOTENT_COMMAND_PREFIX_BYTES;
+            byte[] keyTo = storage.nextKey(IDEMPOTENT_COMMAND_PREFIX_BYTES);
+
+            List<byte[]> evictionCandidateKeys = storage.range(keyFrom, keyTo, 
obsoleteRevision).stream()
+                    // Not sure whether it's possible to retrieve empty entry 
here, thus !entry.empty() was added just in case.
+                    .filter(entry -> !entry.tombstone() && !entry.empty())
+                    .map(Entry::key)
                     .collect(toList());
 
-            if (!commandIdsToRemove.isEmpty()) {
-                List<byte[]> commandIdStorageKeys = commandIdsToRemove.stream()
-                        .map(commandId -> ArrayUtils.concat(new byte[]{}, 
ByteUtils.toBytes(commandId)))
-                        .collect(toList());
+            // TODO https://issues.apache.org/jira/browse/IGNITE-22828
+            evictionCandidateKeys.forEach(evictionCandidateKeyBytes -> {
+                CommandId commandId = ByteUtils.fromBytes(
+                        evictionCandidateKeyBytes,
+                        IDEMPOTENT_COMMAND_PREFIX_BYTES.length,
+                        evictionCandidateKeyBytes.length
+                );
 
-                storage.removeAll(commandIdStorageKeys, null);
+                idempotentCommandCache.remove(commandId);
+            });
 
-                
commandIdsToRemove.forEach(idempotentCommandCache.keySet()::remove);
-            }
+            storage.removeAll(evictionCandidateKeys, operationTimestamp);
 
-            LOG.info("Idempotent command cache cleanup finished 
[cleanupTimestamp={}, cleanupCompletionTimestamp={},"
+            LOG.info("Idempotent command cache cleanup finished 
[evictionTimestamp={}, cleanupCompletionTimestamp={},"
                             + " removedEntriesCount={}, cacheSize={}].",
-                    cleanupTimestamp,
+                    evictionTimestamp,
                     clusterTime.now(),
-                    commandIdsToRemove.size(),
+                    evictionCandidateKeys.size(),
                     idempotentCommandCache.size()
             );
-        });
-    }
-
-    private static class IdempotentCommandCachedResult {
-        @Nullable
-        final Serializable result;
-
-        final HybridTimestamp commandStartTime;
-
-        IdempotentCommandCachedResult(@Nullable Serializable result, 
HybridTimestamp commandStartTime) {
-            this.result = result;
-            this.commandStartTime = commandStartTime;
         }
     }
 
@@ -451,7 +440,7 @@ public class MetaStorageWriteHandler {
 
             // Exceptions are not cached.
             if (!(res instanceof Throwable)) {
-                idempotentCommandCache.put(command.id(), new 
IdempotentCommandCachedResult(res, command.initiatorTime()));
+                idempotentCommandCache.put(command.id(), res);
             }
 
             closure.result(res);
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/IdempotentCommandCacheTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/IdempotentCommandCacheTest.java
index b79593ad5e..f41319401a 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/IdempotentCommandCacheTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/IdempotentCommandCacheTest.java
@@ -17,8 +17,6 @@
 
 package org.apache.ignite.internal.metastorage.impl;
 
-import static java.util.concurrent.CompletableFuture.completedFuture;
-import static 
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
@@ -36,7 +34,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
-import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.lang.ByteArray;
@@ -52,7 +49,6 @@ import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStora
 import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
 import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
 import org.apache.ignite.internal.raft.WriteCommand;
-import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.raft.service.CommandClosure;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
@@ -81,18 +77,10 @@ public class IdempotentCommandCacheTest extends 
BaseIgniteAbstractTest {
 
     private final CommandIdGenerator commandIdGenerator = new 
CommandIdGenerator(() -> UUID.randomUUID().toString());
 
-    @InjectConfiguration
-    private RaftConfiguration raftConfiguration;
-
     @BeforeEach
     public void setUp() {
         storage = new SimpleInMemoryKeyValueStorage(NODE_NAME);
-        metaStorageListener = new MetaStorageListener(
-                storage,
-                new ClusterTimeImpl(NODE_NAME, new IgniteSpinBusyLock(), 
clock),
-                raftConfiguration.retryTimeout(),
-                completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
-        );
+        metaStorageListener = new MetaStorageListener(storage, new 
ClusterTimeImpl(NODE_NAME, new IgniteSpinBusyLock(), clock));
     }
 
     @Test
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
index c548f0d734..d6fc9c0431 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.metastorage.impl;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
-import static 
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -44,7 +43,6 @@ import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.raft.RaftManager;
 import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
 import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
-import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -58,9 +56,6 @@ public class MetaStorageDeployWatchesCorrectnessTest extends 
IgniteAbstractTest
     @InjectConfiguration
     private static MetaStorageConfiguration metaStorageConfiguration;
 
-    @InjectConfiguration
-    private static RaftConfiguration raftConfiguration;
-
     /**
      * Returns a stream with test arguments.
      *
@@ -92,9 +87,7 @@ public class MetaStorageDeployWatchesCorrectnessTest extends 
IgniteAbstractTest
                         clock,
                         mock(TopologyAwareRaftGroupServiceFactory.class),
                         new NoOpMetricManager(),
-                        metaStorageConfiguration,
-                        raftConfiguration.retryTimeout(),
-                        completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
+                        metaStorageConfiguration
                 ),
                 StandaloneMetaStorageManager.create()
         );
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
index e998f62024..14da02ec7c 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.metastorage.impl;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
-import static 
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -50,7 +49,6 @@ import org.apache.ignite.internal.network.TopologyService;
 import 
org.apache.ignite.internal.network.serialization.MessageSerializationRegistry;
 import org.apache.ignite.internal.raft.RaftManager;
 import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
-import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.network.NodeMetadata;
@@ -67,9 +65,6 @@ public class MetaStorageManagerRecoveryTest extends 
BaseIgniteAbstractTest {
     @InjectConfiguration
     private static MetaStorageConfiguration metaStorageConfiguration;
 
-    @InjectConfiguration
-    private static RaftConfiguration raftConfiguration;
-
     private MetaStorageManagerImpl metaStorageManager;
 
     private KeyValueStorage kvs;
@@ -94,9 +89,7 @@ public class MetaStorageManagerRecoveryTest extends 
BaseIgniteAbstractTest {
                 clock,
                 mock(TopologyAwareRaftGroupServiceFactory.class),
                 new NoOpMetricManager(),
-                metaStorageConfiguration,
-                raftConfiguration.retryTimeout(),
-                completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
+                metaStorageConfiguration
         );
     }
 
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
index d6b695729a..a27ce6062e 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
@@ -19,6 +19,9 @@ package org.apache.ignite.internal.metastorage.server;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.function.Function.identity;
+import static org.apache.ignite.internal.hlc.HybridTimestamp.MAX_VALUE;
+import static org.apache.ignite.internal.hlc.HybridTimestamp.MIN_VALUE;
+import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.remove;
@@ -2093,6 +2096,63 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
         verify(mockCallback, never()).onRevisionApplied(anyLong());
     }
 
+    @Test
+    public void testRevisionByTimestamp() {
+        // Verify that in case of empty storage -1 will be returned.
+        assertEquals(-1, storage.revisionByTimestamp(MIN_VALUE));
+        assertEquals(-1, storage.revisionByTimestamp(hybridTimestamp(2)));
+        assertEquals(-1, storage.revisionByTimestamp(hybridTimestamp(5)));
+        assertEquals(-1, storage.revisionByTimestamp(hybridTimestamp(7)));
+        assertEquals(-1, storage.revisionByTimestamp(hybridTimestamp(10)));
+        assertEquals(-1, storage.revisionByTimestamp(hybridTimestamp(12)));
+        assertEquals(-1, storage.revisionByTimestamp(hybridTimestamp(15)));
+        assertEquals(-1, storage.revisionByTimestamp(hybridTimestamp(17)));
+        assertEquals(-1, storage.revisionByTimestamp(MAX_VALUE));
+
+        // Populate storage with some data in order to have following revision 
to timestamp mapping:
+        // 1 -> 5
+        // 2 -> 10
+        // 3 -> 15
+        {
+            storage.put(key(1), keyValue(1, 1), hybridTimestamp(5));
+            assertEquals(1, storage.revision());
+
+            storage.put(key(1), keyValue(1, 1), hybridTimestamp(10));
+            assertEquals(2, storage.revision());
+
+            storage.put(key(2), keyValue(2, 2), hybridTimestamp(15));
+            assertEquals(3, storage.revision());
+        }
+
+        // Check revisionByTimestamp()
+        {
+            assertEquals(-1, storage.revisionByTimestamp(MIN_VALUE));
+
+            // There's no revision associated with 2, so closest left one is 
expected.
+            assertEquals(-1, storage.revisionByTimestamp(hybridTimestamp(2)));
+
+            // Exact matching 1 -> 5
+            assertEquals(1, storage.revisionByTimestamp(hybridTimestamp(5)));
+
+            // There's no revision associated with 7, so closest left one is 
expected.
+            assertEquals(1, storage.revisionByTimestamp(hybridTimestamp(7)));
+
+            // Exact matching 2 -> 10
+            assertEquals(2, storage.revisionByTimestamp(hybridTimestamp(10)));
+
+            // There's no revision associated with 12, so closest left one is 
expected.
+            assertEquals(2, storage.revisionByTimestamp(hybridTimestamp(12)));
+
+            // Exact matching 3 -> 15
+            assertEquals(3, storage.revisionByTimestamp(hybridTimestamp(15)));
+
+            // There's no revision associated with 17, so closest left one is 
expected.
+            assertEquals(3, storage.revisionByTimestamp(hybridTimestamp(17)));
+
+            assertEquals(3, storage.revisionByTimestamp(MAX_VALUE));
+        }
+    }
+
     private CompletableFuture<Void> watchExact(
             byte[] key, long revision, int expectedNumCalls, 
BiConsumer<WatchEvent, Integer> testCondition
     ) {
@@ -2160,19 +2220,19 @@ public abstract class 
BasicOperationsKeyValueStorageTest extends AbstractKeyValu
     }
 
     void putToMs(byte[] key, byte[] value) {
-        storage.put(key, value, HybridTimestamp.MIN_VALUE);
+        storage.put(key, value, MIN_VALUE);
     }
 
     private void putAllToMs(List<byte[]> keys, List<byte[]> values) {
-        storage.putAll(keys, values, HybridTimestamp.MIN_VALUE);
+        storage.putAll(keys, values, MIN_VALUE);
     }
 
     private void removeFromMs(byte[] key) {
-        storage.remove(key, HybridTimestamp.MIN_VALUE);
+        storage.remove(key, MIN_VALUE);
     }
 
     private void removeAllFromMs(List<byte[]> keys) {
-        storage.removeAll(keys, HybridTimestamp.MIN_VALUE);
+        storage.removeAll(keys, MIN_VALUE);
     }
 
     private boolean invokeOnMs(Condition condition, Collection<Operation> 
success, Collection<Operation> failure) {
@@ -2180,7 +2240,7 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
                 condition,
                 success,
                 failure,
-                HybridTimestamp.MIN_VALUE,
+                MIN_VALUE,
                 new CommandIdGenerator(() -> 
UUID.randomUUID().toString()).newId()
         );
     }
@@ -2188,7 +2248,7 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
     private StatementResult invokeOnMs(If iif) {
         return storage.invoke(
                 iif,
-                HybridTimestamp.MIN_VALUE,
+                MIN_VALUE,
                 new CommandIdGenerator(() -> 
UUID.randomUUID().toString()).newId()
         );
     }
diff --git 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
index 92623da88d..3cd5ed52fc 100644
--- 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
+++ 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.metastorage.impl;
 
 import static java.util.Collections.singleton;
 import static java.util.concurrent.CompletableFuture.completedFuture;
-import static 
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -29,7 +28,6 @@ import java.io.Serializable;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.LongSupplier;
 import org.apache.ignite.configuration.ConfigurationValue;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
@@ -108,9 +106,7 @@ public class StandaloneMetaStorageManager extends 
MetaStorageManagerImpl {
                 keyValueStorage,
                 mock(TopologyAwareRaftGroupServiceFactory.class),
                 mockConfiguration(),
-                clock,
-                mockRaftConfiguration().retryTimeout(),
-                completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
+                clock
         );
     }
 
@@ -131,9 +127,7 @@ public class StandaloneMetaStorageManager extends 
MetaStorageManagerImpl {
             KeyValueStorage storage,
             TopologyAwareRaftGroupServiceFactory raftServiceFactory,
             MetaStorageConfiguration configuration,
-            HybridClock clock,
-            ConfigurationValue<Long> idempotentCacheTtl,
-            CompletableFuture<LongSupplier> maxClockSkewMillisFuture
+            HybridClock clock
     ) {
         super(
                 clusterService,
@@ -144,9 +138,7 @@ public class StandaloneMetaStorageManager extends 
MetaStorageManagerImpl {
                 clock,
                 raftServiceFactory,
                 new NoOpMetricManager(),
-                configuration,
-                idempotentCacheTtl,
-                maxClockSkewMillisFuture
+                configuration
         );
     }
 
diff --git 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 406510ffb3..3822585e73 100644
--- 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -391,6 +391,20 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
         }
     }
 
+    @Override
+    public long revisionByTimestamp(HybridTimestamp timestamp) {
+        synchronized (mux) {
+            Map.Entry<Long, Long> revisionEntry = 
tsToRevMap.floorEntry(timestamp.longValue());
+
+            if (revisionEntry == null) {
+                // Nothing to compact yet.
+                return -1;
+            }
+
+            return revisionEntry.getValue();
+        }
+    }
+
     @Override
     public void setRecoveryRevisionListener(@Nullable LongConsumer listener) {
         synchronized (mux) {
@@ -493,15 +507,12 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
 
             NavigableMap<Long, NavigableMap<byte[], Value>> compactedRevsIdx = 
new TreeMap<>();
 
-            Map.Entry<Long, Long> revisionEntry = 
tsToRevMap.floorEntry(lowWatermark.longValue());
+            long maxRevision = revisionByTimestamp(lowWatermark);
 
-            if (revisionEntry == null) {
-                // Nothing to compact yet.
+            if (maxRevision == -1) {
                 return;
             }
 
-            long maxRevision = revisionEntry.getValue();
-
             keysIdx.forEach((key, revs) -> compactForKey(key, revs, 
compactedKeysIdx, compactedRevsIdx, maxRevision));
 
             keysIdx = compactedKeysIdx;
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
index 5042c2621b..50eb3d999f 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
@@ -33,7 +33,6 @@ import static 
org.apache.ignite.internal.partition.replicator.PartitionReplicaLi
 import static org.apache.ignite.internal.sql.SqlCommon.DEFAULT_SCHEMA_NAME;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
-import static 
org.apache.ignite.internal.testframework.TestIgnitionManager.DEFAULT_MAX_CLOCK_SKEW_MS;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
 import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
@@ -187,6 +186,7 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.Timeout;
@@ -524,7 +524,7 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
         );
     }
 
-    @Test
+    @RepeatedTest(100)
     @Disabled("https://issues.apache.org/jira/browse/IGNITE-22858";)
     void testAlterFilterTrigger(TestInfo testInfo) throws Exception {
         startNodes(testInfo, 3);
@@ -960,9 +960,7 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
                     hybridClock,
                     topologyAwareRaftGroupServiceFactory,
                     new NoOpMetricManager(),
-                    metaStorageConfiguration,
-                    raftConfiguration.retryTimeout(),
-                    completedFuture(() -> DEFAULT_MAX_CLOCK_SKEW_MS)
+                    metaStorageConfiguration
             ) {
                 @Override
                 public CompletableFuture<Boolean> invoke(
diff --git 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
index 2282eb86f5..24f7091e3e 100644
--- 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
+++ 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
@@ -266,9 +266,7 @@ public class MultiActorPlacementDriverTest extends 
BasePlacementDriverTest {
                     nodeClock,
                     topologyAwareRaftGroupServiceFactory,
                     new NoOpMetricManager(),
-                    metaStorageConfiguration,
-                    raftConfiguration.retryTimeout(),
-                    completedFuture(clockService::maxClockSkewMillis)
+                    metaStorageConfiguration
             );
 
             if (this.metaStorageManager == null) {
diff --git 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
index f586f89e0b..6a6f0a0edc 100644
--- 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
+++ 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
@@ -203,9 +203,7 @@ public class PlacementDriverManagerTest extends 
BasePlacementDriverTest {
                 nodeClock,
                 topologyAwareRaftGroupServiceFactory,
                 new NoOpMetricManager(),
-                metaStorageConfiguration,
-                raftConfiguration.retryTimeout(),
-                completedFuture(clockService::maxClockSkewMillis)
+                metaStorageConfiguration
         );
 
         placementDriverManager = new PlacementDriverManager(
diff --git 
a/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java
 
b/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java
index 2913279d95..01e8aebd08 100644
--- 
a/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java
+++ 
b/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java
@@ -107,7 +107,7 @@ public abstract class ItAbstractListenerSnapshotTest<T 
extends RaftGroupListener
     private ScheduledExecutorService executor;
 
     @InjectConfiguration
-    protected RaftConfiguration raftConfiguration;
+    private RaftConfiguration raftConfiguration;
 
     /**
      * Create executor for raft group services.
diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle
index 5442a588b3..5e66356dc3 100644
--- a/modules/runner/build.gradle
+++ b/modules/runner/build.gradle
@@ -56,6 +56,7 @@ dependencies {
     implementation project(':ignite-raft-api')
     implementation project(':ignite-raft')
     implementation project(':ignite-metastorage')
+    implementation project(':ignite-metastorage-cache')
     implementation project(':ignite-affinity')
     implementation project(':ignite-table')
     implementation project(':ignite-index')
@@ -147,6 +148,7 @@ dependencies {
         exclude group: 'com.fasterxml.jackson.core', module: 'jackson-databind'
     }
     integrationTestImplementation project(':ignite-metastorage')
+    integrationTestImplementation project(':ignite-metastorage-cache')
     integrationTestImplementation project(':ignite-metrics')
     integrationTestImplementation project(':ignite-table')
     integrationTestImplementation project(':ignite-transactions')
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
index 48884adb1d..a2ad98eb88 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
@@ -18,9 +18,7 @@
 package org.apache.ignite.internal.configuration;
 
 import static java.util.concurrent.CompletableFuture.allOf;
-import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toUnmodifiableList;
-import static 
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -212,9 +210,7 @@ public class ItDistributedConfigurationPropertiesTest 
extends BaseIgniteAbstract
                     clock,
                     topologyAwareRaftGroupServiceFactory,
                     new NoOpMetricManager(),
-                    metaStorageConfiguration,
-                    raftConfiguration.retryTimeout(),
-                    completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
+                    metaStorageConfiguration
             );
 
             deployWatchesFut = metaStorageManager.deployWatches();
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
index cff23967a3..4ca6f6c31d 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
@@ -17,8 +17,6 @@
 
 package org.apache.ignite.internal.configuration.storage;
 
-import static java.util.concurrent.CompletableFuture.completedFuture;
-import static 
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -184,9 +182,7 @@ public class ItDistributedConfigurationStorageTest extends 
BaseIgniteAbstractTes
                     clock,
                     topologyAwareRaftGroupServiceFactory,
                     new NoOpMetricManager(),
-                    metaStorageConfiguration,
-                    raftConfiguration.retryTimeout(),
-                    completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
+                    metaStorageConfiguration
             );
 
             deployWatchesFut = metaStorageManager.deployWatches();
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 7064e297a9..0637c23cc6 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -438,9 +438,7 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 hybridClock,
                 topologyAwareRaftGroupServiceFactory,
                 metricManager,
-                metaStorageConfiguration,
-                raftConfiguration.retryTimeout(),
-                maxClockSkewFuture
+                metaStorageConfiguration
         ) {
             @Override
             public CompletableFuture<Boolean> invoke(Condition condition, 
Collection<Operation> success, Collection<Operation> failure) {
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 9ce9ef6e3a..28bcdc769e 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.app;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.TimeUnit.MINUTES;
 import static java.util.stream.Collectors.toSet;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.REBALANCE_SCHEDULER_POOL_SIZE;
 import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
@@ -134,6 +135,7 @@ import 
org.apache.ignite.internal.lowwatermark.event.ChangeLowWatermarkEventPara
 import org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.cache.IdempotentCacheVacuumizer;
 import 
org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
 import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
 import 
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
@@ -321,6 +323,9 @@ public class IgniteImpl implements Ignite {
     /** Configuration manager that handles cluster (distributed) 
configuration. */
     private final ConfigurationManager clusterCfgMgr;
 
+    /** Idempotent cache vacuumizer. */
+    private final IdempotentCacheVacuumizer idempotentCacheVacuumizer;
+
     /** Cluster initializer. */
     private final ClusterInitializer clusterInitializer;
 
@@ -588,8 +593,6 @@ public class IgniteImpl implements Ignite {
                 raftGroupEventsClientListener
         );
 
-        CompletableFuture<LongSupplier> maxClockSkewMillisFuture = new 
CompletableFuture<>();
-
         metaStorageMgr = new MetaStorageManagerImpl(
                 clusterSvc,
                 cmgMgr,
@@ -598,9 +601,7 @@ public class IgniteImpl implements Ignite {
                 new RocksDbKeyValueStorage(name, 
workDir.resolve(METASTORAGE_DB_PATH), failureProcessor),
                 clock,
                 topologyAwareRaftGroupServiceFactory,
-                metricManager,
-                raftConfiguration.retryTimeout(),
-                maxClockSkewMillisFuture
+                metricManager
         );
 
         this.cfgStorage = new DistributedConfigurationStorage(name, 
metaStorageMgr);
@@ -622,7 +623,18 @@ public class IgniteImpl implements Ignite {
 
         clockService = new ClockServiceImpl(clock, clockWaiter, new 
SameValueLongSupplier(() -> schemaSyncConfig.maxClockSkew().value()));
 
-        maxClockSkewMillisFuture.complete(clockService::maxClockSkewMillis);
+        idempotentCacheVacuumizer = new IdempotentCacheVacuumizer(
+                name,
+                threadPoolsManager.commonScheduler(),
+                metaStorageMgr::evictIdempotentCommandsCache,
+                
nodeCfgMgr.configurationRegistry().getConfiguration(RaftConfiguration.KEY).retryTimeout(),
+                clockService,
+                1,
+                1,
+                MINUTES
+        );
+
+        metaStorageMgr.addElectionListener(idempotentCacheVacuumizer);
 
         Consumer<LongFunction<CompletableFuture<?>>> registry = c -> 
metaStorageMgr.registerRevisionUpdateListener(c::apply);
 
@@ -1114,7 +1126,6 @@ public class IgniteImpl implements Ignite {
 
                     clusterSvc.updateMetadata(
                             new NodeMetadata(restComponent.hostName(), 
restComponent.httpPort(), restComponent.httpsPort()));
-
                 } catch (Throwable e) {
                     startupExecutor.shutdownNow();
 
@@ -1168,6 +1179,7 @@ public class IgniteImpl implements Ignite {
                                 catalogCompactionRunner,
                                 indexMetaStorage,
                                 clusterCfgMgr,
+                                idempotentCacheVacuumizer,
                                 authenticationManager,
                                 placementDriverMgr,
                                 metricManager,
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index b61fb76b24..fcc7294998 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.rebalance;
 
 import static java.util.Collections.reverse;
 import static java.util.concurrent.CompletableFuture.allOf;
-import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.toSet;
@@ -1174,9 +1173,7 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     hybridClock,
                     topologyAwareRaftGroupServiceFactory,
                     metricManager,
-                    metaStorageConfiguration,
-                    raftConfiguration.retryTimeout(),
-                    completedFuture(() -> DEFAULT_MAX_CLOCK_SKEW_MS)
+                    metaStorageConfiguration
             );
 
             placementDriver = new TestPlacementDriver(() -> 
PRIMARY_FILTER.apply(clusterService.topologyService().allMembers()));
diff --git a/settings.gradle b/settings.gradle
index 00016e3ec5..a5c995b2b1 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -54,6 +54,7 @@ include(':ignite-runner')
 include(':ignite-index')
 include(':ignite-metastorage-api')
 include(':ignite-metastorage')
+include(':ignite-metastorage-cache')
 include(':ignite-rest-api')
 include(':ignite-storage-rocksdb')
 include(':ignite-configuration-annotation-processor')
@@ -126,6 +127,7 @@ project(":ignite-runner").projectDir = 
file('modules/runner')
 project(":ignite-index").projectDir = file('modules/index')
 project(":ignite-metastorage-api").projectDir = file('modules/metastorage-api')
 project(":ignite-metastorage").projectDir = file('modules/metastorage')
+project(":ignite-metastorage-cache").projectDir = 
file('modules/metastorage-cache')
 project(":ignite-rest-api").projectDir = file('modules/rest-api')
 project(":ignite-storage-rocksdb").projectDir = file('modules/storage-rocksdb')
 project(":ignite-configuration-annotation-processor").projectDir = 
file('modules/configuration-annotation-processor')

Reply via email to