This is an automated email from the ASF dual-hosted git repository. sanpwc pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new eeac526827 IGNITE-22819 Fix Metastorage revisions inconsistency (#4143) eeac526827 is described below commit eeac5268276251dcaadc04d23c54821a6c4d9806 Author: Alexander Lapin <lapin1...@gmail.com> AuthorDate: Fri Aug 2 15:02:58 2024 +0300 IGNITE-22819 Fix Metastorage revisions inconsistency (#4143) --- .../org/apache/ignite/internal/util/ByteUtils.java | 20 ++ .../DistributionZoneRebalanceEngineTest.java | 13 +- .../RebalanceUtilUpdateAssignmentsTest.java | 14 +- modules/jacoco-report/build.gradle | 1 + modules/metastorage-cache/README.md | 10 + modules/metastorage-cache/build.gradle | 33 +++ .../cache/IdempotentCacheVacuumizer.java | 165 +++++++++++++++ .../cache/IdempotentCacheVacuumizerTest.java | 225 +++++++++++++++++++++ .../impl/ItIdempotentCommandCacheTest.java | 11 +- .../impl/ItMetaStorageManagerImplTest.java | 9 +- .../ItMetaStorageMultipleNodesAbstractTest.java | 6 +- .../impl/ItMetaStorageServicePersistenceTest.java | 9 +- .../metastorage/impl/ItMetaStorageServiceTest.java | 12 +- .../metastorage/impl/ItMetaStorageWatchTest.java | 6 +- .../server/raft/ItMetaStorageRaftGroupTest.java | 25 +-- .../EvictIdempotentCommandsCacheCommand.java | 30 +++ .../command/MetastorageCommandsMessageGroup.java | 3 + .../impl/MetaStorageLeaderElectionListener.java | 11 +- .../metastorage/impl/MetaStorageManagerImpl.java | 70 ++----- .../metastorage/impl/MetaStorageServiceImpl.java | 32 +++ .../metastorage/server/KeyValueStorage.java | 8 + .../server/persistence/RocksDbKeyValueStorage.java | 39 ++-- .../server/raft/MetaStorageListener.java | 23 +-- .../server/raft/MetaStorageWriteHandler.java | 93 ++++----- .../impl/IdempotentCommandCacheTest.java | 14 +- .../MetaStorageDeployWatchesCorrectnessTest.java | 9 +- .../impl/MetaStorageManagerRecoveryTest.java | 9 +- .../server/BasicOperationsKeyValueStorageTest.java | 72 ++++++- .../impl/StandaloneMetaStorageManager.java | 14 +- .../server/SimpleInMemoryKeyValueStorage.java | 21 +- .../replicator/ItReplicaLifecycleTest.java | 8 +- .../MultiActorPlacementDriverTest.java | 4 +- .../PlacementDriverManagerTest.java | 4 +- .../service/ItAbstractListenerSnapshotTest.java | 2 +- modules/runner/build.gradle | 2 + .../ItDistributedConfigurationPropertiesTest.java | 6 +- .../ItDistributedConfigurationStorageTest.java | 6 +- .../runner/app/ItIgniteNodeRestartTest.java | 4 +- .../org/apache/ignite/internal/app/IgniteImpl.java | 26 ++- .../rebalance/ItRebalanceDistributedTest.java | 5 +- settings.gradle | 2 + 41 files changed, 756 insertions(+), 320 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java index 8efda24489..86563608ed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java @@ -268,6 +268,26 @@ public class ByteUtils { } } + /** + * Deserializes an object from byte array using native java serialization mechanism. + * + * @param bytes Byte array. + * @param from – the offset in the buffer of the first byte to read. + * @param length – the maximum number of bytes to read from the buffer. + * @return Object. + */ + // TODO https://issues.apache.org/jira/browse/IGNITE-22894 Extend test coverage. + public static <T> T fromBytes(byte[] bytes, int from, int length) { + try ( + var bis = new ByteArrayInputStream(bytes, from, length); + var in = new ObjectInputStream(bis) + ) { + return (T) in.readObject(); + } catch (IOException | ClassNotFoundException e) { + throw new IgniteInternalException("Could not deserialize an object", e); + } + } + /** * Converts a string to a byte array using {@link StandardCharsets#UTF_8}, {@code null} if {@code s} is {@code null}. * diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java index b08c23f7d6..baf31c1b7f 100644 --- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java +++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java @@ -30,7 +30,6 @@ import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey; import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey; import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; -import static org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS; import static org.apache.ignite.internal.table.TableTestUtils.getTableIdStrict; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; @@ -70,7 +69,6 @@ import org.apache.ignite.internal.catalog.CatalogManager; import org.apache.ignite.internal.catalog.commands.ColumnParams; import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; -import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.distributionzones.DistributionZoneManager; import org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil; import org.apache.ignite.internal.distributionzones.Node; @@ -99,7 +97,6 @@ import org.apache.ignite.internal.network.ClusterService; import org.apache.ignite.internal.network.MessagingService; import org.apache.ignite.internal.raft.Command; import org.apache.ignite.internal.raft.WriteCommand; -import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.raft.service.CommandClosure; import org.apache.ignite.internal.raft.service.RaftGroupService; import org.apache.ignite.internal.replicator.TablePartitionId; @@ -140,9 +137,6 @@ public class DistributionZoneRebalanceEngineTest extends IgniteAbstractTest { private CatalogManager catalogManager; - @InjectConfiguration - private RaftConfiguration raftConfiguration; - @BeforeEach public void setUp() { String nodeName = "test"; @@ -184,12 +178,7 @@ public class DistributionZoneRebalanceEngineTest extends IgniteAbstractTest { ClusterTimeImpl clusterTime = new ClusterTimeImpl("node", new IgniteSpinBusyLock(), clock); - MetaStorageListener metaStorageListener = new MetaStorageListener( - keyValueStorage, - clusterTime, - raftConfiguration.retryTimeout(), - completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS) - ); + MetaStorageListener metaStorageListener = new MetaStorageListener(keyValueStorage, clusterTime); RaftGroupService metaStorageService = mock(RaftGroupService.class); diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java index e87547b909..0fd9dc6979 100644 --- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java +++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java @@ -17,13 +17,11 @@ package org.apache.ignite.internal.distributionzones.rebalance; -import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.stream.Collectors.toSet; import static org.apache.ignite.internal.affinity.AffinityUtils.calculateAssignmentForPartition; import static org.apache.ignite.internal.affinity.Assignments.toBytes; import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE; import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; -import static org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; @@ -46,7 +44,6 @@ import org.apache.ignite.internal.affinity.Assignments; import org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor; import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; -import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.hlc.HybridTimestamp; @@ -66,7 +63,6 @@ import org.apache.ignite.internal.network.ClusterService; import org.apache.ignite.internal.network.MessagingService; import org.apache.ignite.internal.raft.Command; import org.apache.ignite.internal.raft.WriteCommand; -import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.raft.service.CommandClosure; import org.apache.ignite.internal.raft.service.RaftGroupService; import org.apache.ignite.internal.replicator.TablePartitionId; @@ -110,9 +106,6 @@ public class RebalanceUtilUpdateAssignmentsTest extends IgniteAbstractTest { private final HybridClock clock = new HybridClockImpl(); - @InjectConfiguration - private RaftConfiguration raftConfiguration; - private static final int partNum = 2; private static final int replicas = 2; @@ -138,12 +131,7 @@ public class RebalanceUtilUpdateAssignmentsTest extends IgniteAbstractTest { ClusterTimeImpl clusterTime = new ClusterTimeImpl("node", new IgniteSpinBusyLock(), clock); - MetaStorageListener metaStorageListener = new MetaStorageListener( - keyValueStorage, - clusterTime, - raftConfiguration.retryTimeout(), - completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS) - ); + MetaStorageListener metaStorageListener = new MetaStorageListener(keyValueStorage, clusterTime); RaftGroupService metaStorageService = mock(RaftGroupService.class); diff --git a/modules/jacoco-report/build.gradle b/modules/jacoco-report/build.gradle index 5260664c55..4ff0312858 100644 --- a/modules/jacoco-report/build.gradle +++ b/modules/jacoco-report/build.gradle @@ -31,6 +31,7 @@ dependencies { implementation project(':ignite-core') implementation project(':ignite-metastorage-api') implementation project(':ignite-metastorage') + implementation project(':ignite-metastorage-cache') implementation project(':ignite-network') implementation project(':ignite-network-api') implementation project(':ignite-raft') diff --git a/modules/metastorage-cache/README.md b/modules/metastorage-cache/README.md new file mode 100644 index 0000000000..0983a81056 --- /dev/null +++ b/modules/metastorage-cache/README.md @@ -0,0 +1,10 @@ +# Metastorage idempotent command cache eviction module + +Module responsible for metastorage idempotent command cache eviction. + +Metastorage idempotent command cache is a mapping of commandId -> command evaluation result that is used to store the results of +the invoke and multi-invoke commands in order not to re-evaluate invoke condition in case of operation retry. By the definition it's +necessary to store such results for the command-processing-timeout + max clock skew, thus after given interval corresponding cached command +may re evicted. IdempotentCacheVacuumizer is an actor to trigger such evictions. It would be reasonable to put it inside metastorage module +itself instead of creating new one, however it's not possible because of cyclic dependency. IdempotentCacheVacuumizer requires maxClockSkew +that is stored in distributed configuration, that on it's turn requires metastorage. That's why the new module was introduced. \ No newline at end of file diff --git a/modules/metastorage-cache/build.gradle b/modules/metastorage-cache/build.gradle new file mode 100644 index 0000000000..a4ac189796 --- /dev/null +++ b/modules/metastorage-cache/build.gradle @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +apply from: "$rootDir/buildscripts/java-core.gradle" +apply from: "$rootDir/buildscripts/publishing.gradle" +apply from: "$rootDir/buildscripts/java-junit5.gradle" + +description = 'ignite-metastorage-cache' + +dependencies { + implementation project(':ignite-core') + implementation project(':ignite-metastorage') + implementation libs.jetbrains.annotations + + testImplementation testFixtures(project(":ignite-core")) + testImplementation libs.mockito.junit + testImplementation libs.mockito.core + testImplementation libs.hamcrest.core +} diff --git a/modules/metastorage-cache/src/main/java/org/apache/ignite/internal/metastorage/cache/IdempotentCacheVacuumizer.java b/modules/metastorage-cache/src/main/java/org/apache/ignite/internal/metastorage/cache/IdempotentCacheVacuumizer.java new file mode 100644 index 0000000000..bad5415364 --- /dev/null +++ b/modules/metastorage-cache/src/main/java/org/apache/ignite/internal/metastorage/cache/IdempotentCacheVacuumizer.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.metastorage.cache; + +import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import org.apache.ignite.configuration.ConfigurationValue; +import org.apache.ignite.internal.hlc.ClockService; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.manager.ComponentContext; +import org.apache.ignite.internal.manager.IgniteComponent; +import org.apache.ignite.internal.metastorage.impl.ElectionListener; +import org.apache.ignite.network.ClusterNode; + +/** + * Scheduler wrapper that triggers idempotent cache vacuumization with an ability to suspend and resume the triggering. It is valid but not + * effective to have multiple vacuumizers at the same time, meaning that best-effort uniqueness is preferable. In order to achieve such + * best-effort uniqueness it's possible to use meta storage leader collocation: start/resume triggering on leader election if the leader is + * collocated with a local node, and suspend upon loss of collocation with the leader. + * In case of exception within vacuumization action, vacuumizer will just log a warning without suspending the scheduler. + */ +public class IdempotentCacheVacuumizer implements IgniteComponent, ElectionListener { + private static final IgniteLogger LOG = Loggers.forClass(IdempotentCacheVacuumizer.class); + + private final AtomicBoolean triggerVacuumization; + + private final String nodeName; + + /** Scheduler to run vacuumization actions. */ + private final ScheduledExecutorService scheduler; + + /** Action that will trigger vacuumization process. */ + private final Consumer<HybridTimestamp> vacuumizationAction; + + /** Idempotent cache ttl. */ + private final ConfigurationValue<Long> idempotentCacheTtl; + + /** Clock service. */ + private final ClockService clockService; + + /** The time to delay first execution. */ + private final long initialDelay; + + /** The delay between the termination of one execution and the commencement of the next. */ + private final long delay; + + /** The time unit of the initialDelay and delay parameters. */ + private final TimeUnit unit; + + /** Vacuumization task future. */ + private volatile ScheduledFuture<?> scheduledFuture; + + /** + * The constructor. + * + * @param nodeName Node name. + * @param scheduler Scheduler to run vacuumization actions. + * @param vacuumizationAction Action that will trigger vacuumization process. + * @param idempotentCacheTtl Idempotent cache ttl. + * @param clockService Clock service. + * @param initialDelay The time to delay first execution. + * @param delay The delay between the termination of one execution and the commencement of the next. + * @param unit The time unit of the initialDelay and delay parameters. + */ + public IdempotentCacheVacuumizer( + String nodeName, + ScheduledExecutorService scheduler, + Consumer<HybridTimestamp> vacuumizationAction, + ConfigurationValue<Long> idempotentCacheTtl, + ClockService clockService, + long initialDelay, + long delay, + TimeUnit unit + ) { + this.nodeName = nodeName; + this.triggerVacuumization = new AtomicBoolean(false); + this.scheduler = scheduler; + this.vacuumizationAction = vacuumizationAction; + this.idempotentCacheTtl = idempotentCacheTtl; + this.clockService = clockService; + this.initialDelay = initialDelay; + this.delay = delay; + this.unit = unit; + } + + @Override + public CompletableFuture<Void> startAsync(ComponentContext componentContext) { + scheduledFuture = scheduler.scheduleWithFixedDelay( + () -> { + if (triggerVacuumization.get()) { + try { + vacuumizationAction.accept(hybridTimestamp(clockService.nowLong() + - (idempotentCacheTtl.value() + clockService.maxClockSkewMillis()))); + } catch (Exception e) { + LOG.warn("An exception occurred while executing idempotent cache vacuumization action." + + " Idempotent cache vacuumizer won't be stopped.", e); + } + } + }, + initialDelay, + delay, + unit + ); + + return nullCompletedFuture(); + } + + @Override + public CompletableFuture<Void> stopAsync(ComponentContext componentContext) { + if (scheduledFuture != null) { + scheduledFuture.cancel(false); + } + + return nullCompletedFuture(); + } + + @Override + public void onLeaderElected(ClusterNode newLeader) { + if (newLeader.name().equals(nodeName)) { + startLocalVacuumizationTriggering(); + } else { + suspendLocalVacuumizationTriggering(); + } + } + + /** + * Starts local vacuumization triggering. Will take no effect if vacuumizer was previously stopped. + */ + void startLocalVacuumizationTriggering() { + triggerVacuumization.set(true); + LOG.info("Idempotent cache vacuumizer started."); + } + + /** + * Suspends further local vacuumization triggering. Will take no effect if vacuumizer was previously stopped. + */ + void suspendLocalVacuumizationTriggering() { + triggerVacuumization.set(false); + LOG.info("Idempotent cache vacuumizer suspended."); + } +} diff --git a/modules/metastorage-cache/src/test/java/org/apache/ignite/internal/metastorage/cache/IdempotentCacheVacuumizerTest.java b/modules/metastorage-cache/src/test/java/org/apache/ignite/internal/metastorage/cache/IdempotentCacheVacuumizerTest.java new file mode 100644 index 0000000000..31c29ef938 --- /dev/null +++ b/modules/metastorage-cache/src/test/java/org/apache/ignite/internal/metastorage/cache/IdempotentCacheVacuumizerTest.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.metastorage.cache; + +import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.withSettings; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import org.apache.ignite.configuration.ConfigurationValue; +import org.apache.ignite.internal.TestHybridClock; +import org.apache.ignite.internal.hlc.ClockService; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.hlc.TestClockService; +import org.apache.ignite.internal.manager.ComponentContext; +import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.MockSettings; +import org.mockito.quality.Strictness; + +/** + * Tests for idempotency of {@link IdempotentCacheVacuumizer}. + */ +public class IdempotentCacheVacuumizerTest extends BaseIgniteAbstractTest { + private static final int TOUCH_COUNTER_CHANGE_TIMEOUT_MILLIS = 1_000; + + private static final MockSettings LENIENT_SETTINGS = withSettings().strictness(Strictness.LENIENT); + + private ScheduledExecutorService scheduler; + + private ClockService clocService; + + private ConfigurationValue<Long> idempotentCacheTtlConfigurationValue; + + private IdempotentCacheVacuumizer vacuumizer; + + @BeforeEach + public void setup() { + scheduler = Executors.newSingleThreadScheduledExecutor(); + clocService = new TestClockService(new TestHybridClock(() -> 1L)); + + idempotentCacheTtlConfigurationValue = mock(ConfigurationValue.class, LENIENT_SETTINGS); + when(idempotentCacheTtlConfigurationValue.value()).thenReturn(0L); + } + + @AfterEach + public void tearDown() { + if (vacuumizer != null) { + vacuumizer.beforeNodeStop(); + assertThat(vacuumizer.stopAsync(), willCompleteSuccessfully()); + } + + scheduler.shutdown(); + } + + /** + * Check that IdempotentCacheVacuumizer triggers vacuumization action. + * <ol> + * <li>Ensure that until starting, vacuumizer will not trigger the vacuumization action.</li> + * <li>Start vacuumization triggering and verify that vacuumization action was called.</li> + * <li>Suspend vacuumization triggering and verify that vacuumization action calls were suspended.</li> + * <li>Start vacuumization triggering and verify that vacuumization action was called.</li> + * </ol> + * + * @throws Exception if Thread.sleep() was interrupted. + */ + @Test + public void testIdempotentCacheVacuumizer() throws Exception { + AtomicInteger touchCounter = new AtomicInteger(0); + + IdempotentCacheVacuumizer vacuumizer = new IdempotentCacheVacuumizer( + "Node1", + scheduler, + ignored -> touchCounter.incrementAndGet(), + idempotentCacheTtlConfigurationValue, + clocService, + 0, + 1, + TimeUnit.MILLISECONDS + ); + + assertThat(vacuumizer.startAsync(new ComponentContext()), willCompleteSuccessfully()); + + // Ensure that until starting, vacuumizer will not trigger the vacuumization action. It's a best-effort check. + Thread.sleep(10); + assertEquals(0, touchCounter.get()); + + // Start vacuumization triggering and verify that vacuumization action was called. + vacuumizer.startLocalVacuumizationTriggering(); + assertTrue(waitForCondition( + () -> touchCounter.get() > 0, + TOUCH_COUNTER_CHANGE_TIMEOUT_MILLIS) + ); + + // Suspend vacuumization triggering and verify that vacuumization action calls were suspended. + vacuumizer.suspendLocalVacuumizationTriggering(); + int touchCounterAfterStopTriggered = touchCounter.get(); + assertTrue(waitForCondition( + () -> touchCounter.get() == touchCounterAfterStopTriggered || touchCounter.get() == touchCounterAfterStopTriggered + 1, + TOUCH_COUNTER_CHANGE_TIMEOUT_MILLIS) + ); + + // Start vacuumization triggering and verify that vacuumization action was called. + vacuumizer.startLocalVacuumizationTriggering(); + assertTrue(waitForCondition( + () -> touchCounter.get() > touchCounterAfterStopTriggered + 1, + TOUCH_COUNTER_CHANGE_TIMEOUT_MILLIS) + ); + } + + /** + * Check that IdempotentCacheVacuumizer doesn't trigger vacuumization action after shutdown. + * <ol> + * <li>Start vacuumization triggering and verify that vacuumization action was called.</li> + * <li>Shutdown the vacuumizer scheduler and check that action calls were stopped.</li> + * <li>Start the vacuumizer and check that it doesn't take any effect.</li> + * <li>Suspend vacuumization triggering and check that it doesn't take any effect.</li> + * </ol> + * + * @throws Exception if Thread.sleep() was interrupted. + */ + @Test + public void testIdempotentCacheVacuumizerAfterShutdown() throws Exception { + AtomicInteger touchCounter = new AtomicInteger(0); + + IdempotentCacheVacuumizer vacuumizer = new IdempotentCacheVacuumizer( + "Node1", + scheduler, + ignored -> touchCounter.incrementAndGet(), + idempotentCacheTtlConfigurationValue, + clocService, + 0, + 1, + TimeUnit.MILLISECONDS + ); + + assertThat(vacuumizer.startAsync(new ComponentContext()), willCompleteSuccessfully()); + + // Start vacuumization triggering and verify that vacuumization action was called. + vacuumizer.startLocalVacuumizationTriggering(); + assertTrue(waitForCondition( + () -> touchCounter.get() > 0, + TOUCH_COUNTER_CHANGE_TIMEOUT_MILLIS) + ); + + // Shutdown the vacuumizer scheduler and check that action calls were stopped. + scheduler.shutdown(); + int touchCounterAfterShutdown = touchCounter.get(); + Thread.sleep(10); + assertTrue(touchCounter.get() == touchCounterAfterShutdown || touchCounter.get() == touchCounterAfterShutdown + 1); + + // Start the vacuumizer and check that it doesn't take any effect. + vacuumizer.startLocalVacuumizationTriggering(); + Thread.sleep(10); + assertTrue(touchCounter.get() == touchCounterAfterShutdown || touchCounter.get() == touchCounterAfterShutdown + 1); + + // Suspend vacuumization triggering and check that it doesn't take any effect. + vacuumizer.suspendLocalVacuumizationTriggering(); + Thread.sleep(10); + assertTrue(touchCounter.get() == touchCounterAfterShutdown || touchCounter.get() == touchCounterAfterShutdown + 1); + } + + /** + * Check that IdempotentCacheVacuumizer doesn't stops on exception in vacuumization action doesn't re-throw it to the outer environment + * but logs an exception with WARN. + * + * @throws Exception if Thread.sleep() was interrupted. + */ + @Test + public void testIdempotentCacheExceptionHandling() throws Exception { + AtomicInteger touchCounter = new AtomicInteger(0); + + Consumer<HybridTimestamp> vacuumizationActionStub = ignored -> { + touchCounter.incrementAndGet(); + throw new IllegalStateException(); + }; + + IdempotentCacheVacuumizer vacuumizer = new IdempotentCacheVacuumizer( + "Node1", + scheduler, + vacuumizationActionStub, + idempotentCacheTtlConfigurationValue, + clocService, + 0, + 1, + TimeUnit.MILLISECONDS + ); + + assertThat(vacuumizer.startAsync(new ComponentContext()), willCompleteSuccessfully()); + + // Start vacuumization triggering and verify that vacuumization actions were not stopped after exception. + vacuumizer.startLocalVacuumizationTriggering(); + + assertTrue(waitForCondition( + () -> touchCounter.get() > 1, + TOUCH_COUNTER_CHANGE_TIMEOUT_MILLIS) + ); + } +} diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java index da3cf1b836..7c5b0d18bb 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.metastorage.impl; -import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.stream.Collectors.toSet; import static org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS; import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists; @@ -196,9 +195,7 @@ public class ItIdempotentCommandCacheTest extends IgniteAbstractTest { clock, topologyAwareRaftGroupServiceFactory, new NoOpMetricManager(), - metaStorageConfiguration, - raftConfiguration.retryTimeout(), - completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS) + metaStorageConfiguration ); clockWaiter = new ClockWaiter(clusterService.nodeName(), clock); @@ -415,10 +412,10 @@ public class ItIdempotentCommandCacheTest extends IgniteAbstractTest { ), willCompleteSuccessfully()); } - for (Node node : nodes) { - node.metaStorageManager.evictIdempotentCommandsCache(); - } + HybridTimestamp evictionTimestamp = HybridTimestamp.hybridTimestamp(nodes.get(0).clockService.nowLong() + - (raftConfiguration.retryTimeout().value() + nodes.get(0).clockService.maxClockSkewMillis())); + assertThat(nodes.get(0).metaStorageManager.evictIdempotentCommandsCache(evictionTimestamp), willCompleteSuccessfully()); // Run same idempotent command one more time and check that condition **was** re-evaluated and not retrieved from the cache. CompletableFuture<Object> commandProcessingResultFuture3 = raftClient().run(idempotentCommand); diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java index 8275484bf8..3c8a05542c 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.metastorage.impl; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.stream.Collectors.toList; -import static org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS; import static org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.clusterService; import static org.apache.ignite.internal.testframework.flow.TestFlowUtils.subscribeToList; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast; @@ -146,9 +145,7 @@ public class ItMetaStorageManagerImplTest extends IgniteAbstractTest { clock, topologyAwareRaftGroupServiceFactory, new NoOpMetricManager(), - metaStorageConfiguration, - raftConfiguration.retryTimeout(), - completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS) + metaStorageConfiguration ); assertThat( @@ -235,9 +232,7 @@ public class ItMetaStorageManagerImplTest extends IgniteAbstractTest { storage, new HybridClockImpl(), mock(TopologyAwareRaftGroupServiceFactory.class), - new NoOpMetricManager(), - raftConfiguration.retryTimeout(), - completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS) + new NoOpMetricManager() ); assertThat(metaStorageManager.stopAsync(new ComponentContext()), willCompleteSuccessfully()); diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java index d6b4941299..dd5d620b4b 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java @@ -18,9 +18,7 @@ package org.apache.ignite.internal.metastorage.impl; import static java.util.concurrent.CompletableFuture.allOf; -import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.stream.Collectors.toSet; -import static org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS; import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists; import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision; import static org.apache.ignite.internal.metastorage.dsl.Operations.noop; @@ -205,9 +203,7 @@ public abstract class ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr clock, topologyAwareRaftGroupServiceFactory, new NoOpMetricManager(), - metaStorageConfiguration, - raftConfiguration.retryTimeout(), - completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS) + metaStorageConfiguration ); deployWatchesFut = metaStorageManager.deployWatches(); diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java index 366a103993..ccf4eaffd0 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java @@ -17,8 +17,6 @@ package org.apache.ignite.internal.metastorage.impl; -import static java.util.concurrent.CompletableFuture.completedFuture; -import static org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS; import static org.junit.jupiter.api.Assertions.assertEquals; import java.nio.charset.StandardCharsets; @@ -160,12 +158,7 @@ public class ItMetaStorageServicePersistenceTest extends ItAbstractListenerSnaps return s; }); - return new MetaStorageListener( - storage, - new ClusterTimeImpl(nodeName, new IgniteSpinBusyLock(), new HybridClockImpl()), - raftConfiguration.retryTimeout(), - completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS) - ); + return new MetaStorageListener(storage, new ClusterTimeImpl(nodeName, new IgniteSpinBusyLock(), new HybridClockImpl())); } /** {@inheritDoc} */ diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java index 3ab7ebf264..559e617ee7 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java @@ -17,10 +17,8 @@ package org.apache.ignite.internal.metastorage.impl; -import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toUnmodifiableSet; -import static org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS; import static org.apache.ignite.internal.metastorage.dsl.Conditions.and; import static org.apache.ignite.internal.metastorage.dsl.Conditions.or; import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision; @@ -189,11 +187,8 @@ public class ItMetaStorageServiceTest extends BaseIgniteAbstractTest { private MetaStorageService metaStorageService; - private RaftConfiguration raftConfiguration; - Node(ClusterService clusterService, RaftConfiguration raftConfiguration, Path dataPath) { this.clusterService = clusterService; - this.raftConfiguration = raftConfiguration; HybridClock clock = new HybridClockImpl(); @@ -239,12 +234,7 @@ public class ItMetaStorageServiceTest extends BaseIgniteAbstractTest { assert peer != null; - var listener = new MetaStorageListener( - mockStorage, - clusterTime, - raftConfiguration.retryTimeout(), - completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS) - ); + var listener = new MetaStorageListener(mockStorage, clusterTime); var raftNodeId = new RaftNodeId(MetastorageGroupId.INSTANCE, peer); diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java index 8bcaf65ecb..05b7c8f1b5 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java @@ -17,10 +17,8 @@ package org.apache.ignite.internal.metastorage.impl; -import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; -import static org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS; import static org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.findLocalAddresses; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; @@ -195,9 +193,7 @@ public class ItMetaStorageWatchTest extends IgniteAbstractTest { clock, topologyAwareRaftGroupServiceFactory, new NoOpMetricManager(), - metaStorageConfiguration, - raftConfiguration.retryTimeout(), - completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS) + metaStorageConfiguration ); components.add(metaStorageManager); diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java index 50894e927e..e1ce03451d 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java @@ -17,10 +17,8 @@ package org.apache.ignite.internal.metastorage.server.raft; -import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.stream.Collectors.collectingAndThen; import static java.util.stream.Collectors.toSet; -import static org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS; import static org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.findLocalAddresses; import static org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.waitForTopology; import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults; @@ -83,6 +81,7 @@ import org.apache.ignite.raft.jraft.option.NodeOptions; import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.extension.ExtendWith; @@ -221,6 +220,7 @@ public class ItMetaStorageRaftGroupTest extends IgniteAbstractTest { * @throws Exception If failed. */ @Test + @Disabled("https://issues.apache.org/jira/browse/IGNITE-22891") public void testRangeNextWorksCorrectlyAfterLeaderChange() throws Exception { AtomicInteger replicatorStartedCounter = new AtomicInteger(0); @@ -411,12 +411,7 @@ public class ItMetaStorageRaftGroupTest extends IgniteAbstractTest { metaStorageRaftSrv1.startRaftNode( raftNodeId1, membersConfiguration, - new MetaStorageListener( - mockStorage, - mock(ClusterTimeImpl.class), - raftConfiguration.retryTimeout(), - completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS) - ), + new MetaStorageListener(mockStorage, mock(ClusterTimeImpl.class)), defaults() ); @@ -425,12 +420,7 @@ public class ItMetaStorageRaftGroupTest extends IgniteAbstractTest { metaStorageRaftSrv2.startRaftNode( raftNodeId2, membersConfiguration, - new MetaStorageListener( - mockStorage, - mock(ClusterTimeImpl.class), - raftConfiguration.retryTimeout(), - completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS) - ), + new MetaStorageListener(mockStorage, mock(ClusterTimeImpl.class)), defaults() ); @@ -439,12 +429,7 @@ public class ItMetaStorageRaftGroupTest extends IgniteAbstractTest { metaStorageRaftSrv3.startRaftNode( raftNodeId3, membersConfiguration, - new MetaStorageListener( - mockStorage, - mock(ClusterTimeImpl.class), - raftConfiguration.retryTimeout(), - completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS) - ), + new MetaStorageListener(mockStorage, mock(ClusterTimeImpl.class)), defaults() ); diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/EvictIdempotentCommandsCacheCommand.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/EvictIdempotentCommandsCacheCommand.java new file mode 100644 index 0000000000..21d6bfbacd --- /dev/null +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/EvictIdempotentCommandsCacheCommand.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.metastorage.command; + +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.network.annotations.Transferable; + +/** + * Removes obsolete entries from both volatile and persistent idempotent command cache. + */ +@Transferable(MetastorageCommandsMessageGroup.EVICT_IDEMPOTENT_COMMAND_CACHE) +public interface EvictIdempotentCommandsCacheCommand extends MetaStorageWriteCommand { + /** Cached entries older than given timestamp will be evicted. */ + HybridTimestamp evictionTimestamp(); +} diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java index 0967b84d5c..819d02717d 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java @@ -61,4 +61,7 @@ public interface MetastorageCommandsMessageGroup { /** Message type for {@link SyncTimeCommand}. */ short SYNC_TIME = 70; + + /** Message type for {@link EvictIdempotentCommandsCacheCommand}. */ + short EVICT_IDEMPOTENT_COMMAND_CACHE = 71; } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLeaderElectionListener.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLeaderElectionListener.java index 016f6c0038..4a493c228f 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLeaderElectionListener.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLeaderElectionListener.java @@ -118,11 +118,12 @@ public class MetaStorageLeaderElectionListener implements LeaderElectionListener logicalTopologyService.addEventListener(logicalTopologyEventListener); metaStorageSvcFut - .thenAcceptBoth(metaStorageConfigurationFuture, (service, metaStorageConfiguration) -> - clusterTime.startSafeTimeScheduler( - safeTime -> service.syncTime(safeTime, term), - metaStorageConfiguration - )) + .thenAcceptBoth(metaStorageConfigurationFuture, (service, metaStorageConfiguration) -> { + clusterTime.startSafeTimeScheduler( + safeTime -> service.syncTime(safeTime, term), + metaStorageConfiguration + ); + }) .whenComplete((v, e) -> { if (e != null) { LOG.error("Unable to start Idle Safe Time scheduler", e); diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java index 4a388c45fa..30ac53b403 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.metastorage.impl; import static java.util.concurrent.CompletableFuture.failedFuture; -import static java.util.concurrent.TimeUnit.MINUTES; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.internal.util.IgniteUtils.cancelOrConsume; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; @@ -29,13 +28,9 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Executors; import java.util.concurrent.Flow.Publisher; import java.util.concurrent.Flow.Subscriber; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.LongSupplier; -import org.apache.ignite.configuration.ConfigurationValue; import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; import org.apache.ignite.internal.hlc.HybridClock; @@ -72,7 +67,6 @@ import org.apache.ignite.internal.raft.RaftNodeId; import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService; import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.raft.service.RaftGroupService; -import org.apache.ignite.internal.thread.NamedThreadFactory; import org.apache.ignite.internal.util.Cursor; import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.apache.ignite.internal.util.IgniteUtils; @@ -137,17 +131,10 @@ public class MetaStorageManagerImpl implements MetaStorageManager { private volatile MetaStorageConfiguration metaStorageConfiguration; - private final ConfigurationValue<Long> idempotentCacheTtl; - - private final CompletableFuture<LongSupplier> maxClockSkewMillisFuture; - private volatile MetaStorageListener followerListener; private volatile MetaStorageListener learnerListener; - // TODO: https://issues.apache.org/jira/browse/IGNITE-19417 Remove, cache eviction should be triggered by MS GC instead. - private final ScheduledExecutorService idempotentCacheVacumizer; - private final List<ElectionListener> electionListeners = new CopyOnWriteArrayList<>(); /** @@ -160,7 +147,6 @@ public class MetaStorageManagerImpl implements MetaStorageManager { * @param storage Storage. This component owns this resource and will manage its lifecycle. * @param clock A hybrid logical clock. * @param metricManager Metric manager. - * @param maxClockSkewMillisFuture Future with maximum clock skew in milliseconds. */ public MetaStorageManagerImpl( ClusterService clusterService, @@ -170,9 +156,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager { KeyValueStorage storage, HybridClock clock, TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory, - MetricManager metricManager, - ConfigurationValue<Long> idempotentCacheTtl, - CompletableFuture<LongSupplier> maxClockSkewMillisFuture + MetricManager metricManager ) { this.clusterService = clusterService; this.raftMgr = raftMgr; @@ -183,10 +167,6 @@ public class MetaStorageManagerImpl implements MetaStorageManager { this.metaStorageMetricSource = new MetaStorageMetricSource(clusterTime); this.topologyAwareRaftGroupServiceFactory = topologyAwareRaftGroupServiceFactory; this.metricManager = metricManager; - this.idempotentCacheTtl = idempotentCacheTtl; - this.maxClockSkewMillisFuture = maxClockSkewMillisFuture; - this.idempotentCacheVacumizer = Executors.newSingleThreadScheduledExecutor( - NamedThreadFactory.create(clusterService.nodeName(), "idempotent-cache-vacumizer", LOG)); } /** @@ -202,9 +182,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager { HybridClock clock, TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory, MetricManager metricManager, - MetaStorageConfiguration configuration, - ConfigurationValue<Long> idempotentCacheTtl, - CompletableFuture<LongSupplier> maxClockSkewMillisFuture + MetaStorageConfiguration configuration ) { this( clusterService, @@ -214,9 +192,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager { storage, clock, topologyAwareRaftGroupServiceFactory, - metricManager, - idempotentCacheTtl, - maxClockSkewMillisFuture + metricManager ); configure(configuration); @@ -338,12 +314,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager { assert localMetaStorageConfiguration != null : "Meta Storage configuration has not been set"; - followerListener = new MetaStorageListener( - storage, - clusterTime, - idempotentCacheTtl, - maxClockSkewMillisFuture - ); + followerListener = new MetaStorageListener(storage, clusterTime); CompletableFuture<TopologyAwareRaftGroupService> raftServiceFuture = raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture( new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer), @@ -387,12 +358,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager { assert localPeer != null; - learnerListener = new MetaStorageListener( - storage, - clusterTime, - idempotentCacheTtl, - maxClockSkewMillisFuture - ); + learnerListener = new MetaStorageListener(storage, clusterTime); return raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture( new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer), @@ -457,8 +423,6 @@ public class MetaStorageManagerImpl implements MetaStorageManager { busyLock.block(); - idempotentCacheVacumizer.shutdownNow(); - deployWatchesFuture.cancel(true); recoveryFinishedFuture.cancel(true); @@ -523,8 +487,6 @@ public class MetaStorageManagerImpl implements MetaStorageManager { MetaStorageManagerImpl.this.onRevisionApplied(revision); } }); - - idempotentCacheVacumizer.scheduleWithFixedDelay(this::evictIdempotentCommandsCache, 1, 1, MINUTES); })) .whenComplete((v, e) -> { if (e == null) { @@ -912,16 +874,20 @@ public class MetaStorageManagerImpl implements MetaStorageManager { } /** - * Removes obsolete entries from both volatile and persistent idempotent command cache. + * Removes obsolete entries from both volatile and persistent idempotent command cache older than evictionTimestamp. + * + * @param evictionTimestamp Cached entries older than given timestamp will be evicted. + * @return Pending operation future. */ - @Deprecated(forRemoval = true) - // TODO: https://issues.apache.org/jira/browse/IGNITE-19417 cache eviction should be triggered by MS GC instead. - public void evictIdempotentCommandsCache() { - if (followerListener != null) { - followerListener.evictIdempotentCommandsCache(); - } - if (learnerListener != null) { - learnerListener.evictIdempotentCommandsCache(); + public CompletableFuture<Void> evictIdempotentCommandsCache(HybridTimestamp evictionTimestamp) { + if (!busyLock.enterBusy()) { + return failedFuture(new NodeStoppingException()); + } + + try { + return metaStorageSvcFut.thenCompose(svc -> svc.evictIdempotentCommandsCache(evictionTimestamp)); + } finally { + busyLock.leaveBusy(); } } } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java index 86b323f4e8..3025895273 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java @@ -36,6 +36,7 @@ import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.metastorage.Entry; import org.apache.ignite.internal.metastorage.MetaStorageManager; +import org.apache.ignite.internal.metastorage.command.EvictIdempotentCommandsCacheCommand; import org.apache.ignite.internal.metastorage.command.GetAllCommand; import org.apache.ignite.internal.metastorage.command.GetCommand; import org.apache.ignite.internal.metastorage.command.GetCurrentRevisionCommand; @@ -268,6 +269,22 @@ public class MetaStorageServiceImpl implements MetaStorageService { return context.raftService().run(cmd); } + /** + * Removes obsolete entries from both volatile and persistent idempotent command cache. + * + * @param evictionTimestamp Cached entries older than given timestamp will be evicted. + * @return Pending operation future. + */ + CompletableFuture<Void> evictIdempotentCommandsCache(HybridTimestamp evictionTimestamp) { + EvictIdempotentCommandsCacheCommand evictIdempotentCommandsCacheCommand = evictIdempotentCommandsCacheCommand( + context.commandsFactory(), + evictionTimestamp, + clusterTime.now() + ); + + return context.raftService().run(evictIdempotentCommandsCacheCommand); + } + @Override public void close() { context.close(); @@ -332,4 +349,19 @@ public class MetaStorageServiceImpl implements MetaStorageService { return commandsFactory.removeAllCommand().keys(list).initiatorTime(ts).build(); } + + /** + * Creates evict idempotent commands cache command. + * + * @param commandsFactory Commands factory. + * @param evictionTimestamp Cached entries older than given timestamp will be evicted. + * @param ts Local time. + */ + private EvictIdempotentCommandsCacheCommand evictIdempotentCommandsCacheCommand( + MetaStorageCommandsFactory commandsFactory, + HybridTimestamp evictionTimestamp, + HybridTimestamp ts + ) { + return commandsFactory.evictIdempotentCommandsCacheCommand().evictionTimestamp(evictionTimestamp).initiatorTime(ts).build(); + } } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java index 3a9ca2a993..3159069f15 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java @@ -270,6 +270,14 @@ public interface KeyValueStorage extends ManuallyCloseable { */ HybridTimestamp timestampByRevision(long revision); + /** + * Looks a revision lesser or equal to the timestamp. + * + * @param timestamp Timestamp by which to do a lookup. + * @return Revision lesser or equal to the timestamp or -1 if there is no such revision. + */ + long revisionByTimestamp(HybridTimestamp timestamp); + /** * Sets the revision listener. This is needed only for the recovery, after that listener must be set to {@code null}. * {@code null} means that we no longer must be notified of revision updates for recovery, because recovery is finished. diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java index d58efd4256..713a85951f 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java @@ -977,23 +977,12 @@ public class RocksDbKeyValueStorage implements KeyValueStorage { try (WriteBatch batch = new WriteBatch()) { - byte[] tsBytes = hybridTsToArray(lowWatermark); - long maxRevision; - // Find a revision with timestamp lesser or equal to the watermark. - try (RocksIterator rocksIterator = tsToRevision.newIterator()) { - rocksIterator.seekForPrev(tsBytes); - - RocksUtils.checkIterator(rocksIterator); - - byte[] tsValue = rocksIterator.value(); - - if (tsValue.length == 0) { - // Nothing to compact yet. - return; - } + long maxRevision = revisionByTimestamp(lowWatermark); - maxRevision = bytesToLong(tsValue); + if (maxRevision == -1) { + // Nothing to compact yet. + return; } try (RocksIterator iterator = index.newIterator()) { @@ -1520,6 +1509,26 @@ public class RocksDbKeyValueStorage implements KeyValueStorage { } } + @Override + public long revisionByTimestamp(HybridTimestamp timestamp) { + byte[] tsBytes = hybridTsToArray(timestamp); + + // Find a revision with timestamp lesser or equal to the watermark. + try (RocksIterator rocksIterator = tsToRevision.newIterator()) { + rocksIterator.seekForPrev(tsBytes); + + RocksUtils.checkIterator(rocksIterator); + + byte[] tsValue = rocksIterator.value(); + + if (tsValue.length == 0) { + return -1; + } + + return bytesToLong(tsValue); + } + } + private void finishReplay() { // Take the lock to drain the event cache and prevent new events from being cached. Since event notification is asynchronous, // this lock shouldn't be held for long. diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java index 5659e37227..8e4780e58b 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java @@ -26,10 +26,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; -import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; -import java.util.function.LongSupplier; -import org.apache.ignite.configuration.ConfigurationValue; import org.apache.ignite.internal.metastorage.Entry; import org.apache.ignite.internal.metastorage.MetaStorageManager; import org.apache.ignite.internal.metastorage.command.GetAllCommand; @@ -67,17 +64,10 @@ public class MetaStorageListener implements RaftGroupListener, BeforeApplyHandle */ public MetaStorageListener( KeyValueStorage storage, - ClusterTimeImpl clusterTime, - ConfigurationValue<Long> idempotentCacheTtl, - CompletableFuture<LongSupplier> maxClockSkewMillisFuture + ClusterTimeImpl clusterTime ) { this.storage = storage; - this.writeHandler = new MetaStorageWriteHandler( - storage, - clusterTime, - idempotentCacheTtl, - maxClockSkewMillisFuture - ); + this.writeHandler = new MetaStorageWriteHandler(storage, clusterTime); } @Override @@ -191,13 +181,4 @@ public class MetaStorageListener implements RaftGroupListener, BeforeApplyHandle @Override public void onShutdown() { } - - /** - * Removes obsolete entries from both volatile and persistent idempotent command cache. - */ - @Deprecated(forRemoval = true) - // TODO: https://issues.apache.org/jira/browse/IGNITE-19417 cache eviction should be triggered by MS GC instead. - public void evictIdempotentCommandsCache() { - writeHandler.evictIdempotentCommandsCache(); - } } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java index bc226520f3..e63b29fff5 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java @@ -28,17 +28,15 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; -import java.util.function.LongSupplier; -import org.apache.ignite.configuration.ConfigurationValue; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.metastorage.CommandId; import org.apache.ignite.internal.metastorage.Entry; +import org.apache.ignite.internal.metastorage.command.EvictIdempotentCommandsCacheCommand; import org.apache.ignite.internal.metastorage.command.IdempotentCommand; import org.apache.ignite.internal.metastorage.command.InvokeCommand; import org.apache.ignite.internal.metastorage.command.MetaStorageWriteCommand; @@ -69,7 +67,6 @@ import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl; import org.apache.ignite.internal.raft.Command; import org.apache.ignite.internal.raft.WriteCommand; import org.apache.ignite.internal.raft.service.CommandClosure; -import org.apache.ignite.internal.util.ArrayUtils; import org.apache.ignite.internal.util.ByteUtils; import org.apache.ignite.internal.util.Cursor; import org.jetbrains.annotations.Nullable; @@ -88,22 +85,14 @@ public class MetaStorageWriteHandler { private final KeyValueStorage storage; private final ClusterTimeImpl clusterTime; - private final Map<CommandId, IdempotentCommandCachedResult> idempotentCommandCache = new ConcurrentHashMap<>(); - - private final ConfigurationValue<Long> idempotentCacheTtl; - - private final CompletableFuture<LongSupplier> maxClockSkewMillisFuture; + private final Map<CommandId, @Nullable Serializable> idempotentCommandCache = new ConcurrentHashMap<>(); MetaStorageWriteHandler( KeyValueStorage storage, - ClusterTimeImpl clusterTime, - ConfigurationValue<Long> idempotentCacheTtl, - CompletableFuture<LongSupplier> maxClockSkewMillisFuture + ClusterTimeImpl clusterTime ) { this.storage = storage; this.clusterTime = clusterTime; - this.idempotentCacheTtl = idempotentCacheTtl; - this.maxClockSkewMillisFuture = maxClockSkewMillisFuture; } /** @@ -118,10 +107,10 @@ public class MetaStorageWriteHandler { IdempotentCommand idempotentCommand = ((IdempotentCommand) command); CommandId commandId = idempotentCommand.id(); - IdempotentCommandCachedResult cachedResult = idempotentCommandCache.get(commandId); + Serializable cachedResult = idempotentCommandCache.get(commandId); if (cachedResult != null) { - clo.result(cachedResult.result); + clo.result(cachedResult); return; } else { @@ -215,6 +204,11 @@ public class MetaStorageWriteHandler { } else if (command instanceof SyncTimeCommand) { storage.advanceSafeTime(command.safeTime()); + clo.result(null); + } else if (command instanceof EvictIdempotentCommandsCacheCommand) { + EvictIdempotentCommandsCacheCommand cmd = (EvictIdempotentCommandsCacheCommand) command; + evictIdempotentCommandsCache(cmd.evictionTimestamp(), opTime); + clo.result(null); } } @@ -352,8 +346,6 @@ public class MetaStorageWriteHandler { byte[] keyTo = storage.nextKey(IDEMPOTENT_COMMAND_PREFIX_BYTES); Cursor<Entry> cursor = storage.range(keyFrom, keyTo); - // It's fine to lose original command start time - in that case we will store the entry a little bit longer that necessary. - HybridTimestamp now = clusterTime.now(); try (cursor) { for (Entry entry : cursor) { @@ -368,7 +360,7 @@ public class MetaStorageWriteHandler { result = MSG_FACTORY.statementResult().result(ByteBuffer.wrap(entry.value())).build(); } - idempotentCommandCache.put(commandId, new IdempotentCommandCachedResult(result, now)); + idempotentCommandCache.put(commandId, result); } } } @@ -376,48 +368,45 @@ public class MetaStorageWriteHandler { /** * Removes obsolete entries from both volatile and persistent idempotent command cache. + * + * @param evictionTimestamp Cached entries older than given timestamp will be evicted. + * @param operationTimestamp Command operation timestamp. */ - // TODO: https://issues.apache.org/jira/browse/IGNITE-19417 Call on meta storage compaction. - void evictIdempotentCommandsCache() { - HybridTimestamp cleanupTimestamp = clusterTime.now(); - LOG.info("Idempotent command cache cleanup started [cleanupTimestamp={}].", cleanupTimestamp); - - maxClockSkewMillisFuture.thenAccept(maxClockSkewMillis -> { - List<CommandId> commandIdsToRemove = idempotentCommandCache.entrySet().stream() - .filter(entry -> entry.getValue().commandStartTime.getPhysical() - <= cleanupTimestamp.getPhysical() - (idempotentCacheTtl.value() + maxClockSkewMillis.getAsLong())) - .map(Map.Entry::getKey) + void evictIdempotentCommandsCache(HybridTimestamp evictionTimestamp, HybridTimestamp operationTimestamp) { + LOG.info("Idempotent command cache cleanup started [evictionTimestamp={}].", evictionTimestamp); + + long obsoleteRevision = storage.revisionByTimestamp(evictionTimestamp); + + if (obsoleteRevision != -1) { + byte[] keyFrom = IDEMPOTENT_COMMAND_PREFIX_BYTES; + byte[] keyTo = storage.nextKey(IDEMPOTENT_COMMAND_PREFIX_BYTES); + + List<byte[]> evictionCandidateKeys = storage.range(keyFrom, keyTo, obsoleteRevision).stream() + // Not sure whether it's possible to retrieve empty entry here, thus !entry.empty() was added just in case. + .filter(entry -> !entry.tombstone() && !entry.empty()) + .map(Entry::key) .collect(toList()); - if (!commandIdsToRemove.isEmpty()) { - List<byte[]> commandIdStorageKeys = commandIdsToRemove.stream() - .map(commandId -> ArrayUtils.concat(new byte[]{}, ByteUtils.toBytes(commandId))) - .collect(toList()); + // TODO https://issues.apache.org/jira/browse/IGNITE-22828 + evictionCandidateKeys.forEach(evictionCandidateKeyBytes -> { + CommandId commandId = ByteUtils.fromBytes( + evictionCandidateKeyBytes, + IDEMPOTENT_COMMAND_PREFIX_BYTES.length, + evictionCandidateKeyBytes.length + ); - storage.removeAll(commandIdStorageKeys, null); + idempotentCommandCache.remove(commandId); + }); - commandIdsToRemove.forEach(idempotentCommandCache.keySet()::remove); - } + storage.removeAll(evictionCandidateKeys, operationTimestamp); - LOG.info("Idempotent command cache cleanup finished [cleanupTimestamp={}, cleanupCompletionTimestamp={}," + LOG.info("Idempotent command cache cleanup finished [evictionTimestamp={}, cleanupCompletionTimestamp={}," + " removedEntriesCount={}, cacheSize={}].", - cleanupTimestamp, + evictionTimestamp, clusterTime.now(), - commandIdsToRemove.size(), + evictionCandidateKeys.size(), idempotentCommandCache.size() ); - }); - } - - private static class IdempotentCommandCachedResult { - @Nullable - final Serializable result; - - final HybridTimestamp commandStartTime; - - IdempotentCommandCachedResult(@Nullable Serializable result, HybridTimestamp commandStartTime) { - this.result = result; - this.commandStartTime = commandStartTime; } } @@ -451,7 +440,7 @@ public class MetaStorageWriteHandler { // Exceptions are not cached. if (!(res instanceof Throwable)) { - idempotentCommandCache.put(command.id(), new IdempotentCommandCachedResult(res, command.initiatorTime())); + idempotentCommandCache.put(command.id(), res); } closure.result(res); diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/IdempotentCommandCacheTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/IdempotentCommandCacheTest.java index b79593ad5e..f41319401a 100644 --- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/IdempotentCommandCacheTest.java +++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/IdempotentCommandCacheTest.java @@ -17,8 +17,6 @@ package org.apache.ignite.internal.metastorage.impl; -import static java.util.concurrent.CompletableFuture.completedFuture; -import static org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS; import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists; import static org.apache.ignite.internal.metastorage.dsl.Operations.ops; import static org.apache.ignite.internal.metastorage.dsl.Operations.put; @@ -36,7 +34,6 @@ import java.util.Iterator; import java.util.List; import java.util.UUID; import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; -import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.lang.ByteArray; @@ -52,7 +49,6 @@ import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStora import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener; import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl; import org.apache.ignite.internal.raft.WriteCommand; -import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.raft.service.CommandClosure; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.internal.util.IgniteSpinBusyLock; @@ -81,18 +77,10 @@ public class IdempotentCommandCacheTest extends BaseIgniteAbstractTest { private final CommandIdGenerator commandIdGenerator = new CommandIdGenerator(() -> UUID.randomUUID().toString()); - @InjectConfiguration - private RaftConfiguration raftConfiguration; - @BeforeEach public void setUp() { storage = new SimpleInMemoryKeyValueStorage(NODE_NAME); - metaStorageListener = new MetaStorageListener( - storage, - new ClusterTimeImpl(NODE_NAME, new IgniteSpinBusyLock(), clock), - raftConfiguration.retryTimeout(), - completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS) - ); + metaStorageListener = new MetaStorageListener(storage, new ClusterTimeImpl(NODE_NAME, new IgniteSpinBusyLock(), clock)); } @Test diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java index c548f0d734..d6fc9c0431 100644 --- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java +++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.metastorage.impl; import static java.util.concurrent.CompletableFuture.completedFuture; -import static org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -44,7 +43,6 @@ import org.apache.ignite.internal.network.ClusterService; import org.apache.ignite.internal.raft.RaftManager; import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService; import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; -import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; @@ -58,9 +56,6 @@ public class MetaStorageDeployWatchesCorrectnessTest extends IgniteAbstractTest @InjectConfiguration private static MetaStorageConfiguration metaStorageConfiguration; - @InjectConfiguration - private static RaftConfiguration raftConfiguration; - /** * Returns a stream with test arguments. * @@ -92,9 +87,7 @@ public class MetaStorageDeployWatchesCorrectnessTest extends IgniteAbstractTest clock, mock(TopologyAwareRaftGroupServiceFactory.class), new NoOpMetricManager(), - metaStorageConfiguration, - raftConfiguration.retryTimeout(), - completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS) + metaStorageConfiguration ), StandaloneMetaStorageManager.create() ); diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java index e998f62024..14da02ec7c 100644 --- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java +++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.metastorage.impl; import static java.util.concurrent.CompletableFuture.completedFuture; -import static org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; @@ -50,7 +49,6 @@ import org.apache.ignite.internal.network.TopologyService; import org.apache.ignite.internal.network.serialization.MessageSerializationRegistry; import org.apache.ignite.internal.raft.RaftManager; import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; -import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.raft.service.RaftGroupService; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.network.NodeMetadata; @@ -67,9 +65,6 @@ public class MetaStorageManagerRecoveryTest extends BaseIgniteAbstractTest { @InjectConfiguration private static MetaStorageConfiguration metaStorageConfiguration; - @InjectConfiguration - private static RaftConfiguration raftConfiguration; - private MetaStorageManagerImpl metaStorageManager; private KeyValueStorage kvs; @@ -94,9 +89,7 @@ public class MetaStorageManagerRecoveryTest extends BaseIgniteAbstractTest { clock, mock(TopologyAwareRaftGroupServiceFactory.class), new NoOpMetricManager(), - metaStorageConfiguration, - raftConfiguration.retryTimeout(), - completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS) + metaStorageConfiguration ); } diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java index d6b695729a..a27ce6062e 100644 --- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java +++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java @@ -19,6 +19,9 @@ package org.apache.ignite.internal.metastorage.server; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.function.Function.identity; +import static org.apache.ignite.internal.hlc.HybridTimestamp.MAX_VALUE; +import static org.apache.ignite.internal.hlc.HybridTimestamp.MIN_VALUE; +import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; import static org.apache.ignite.internal.metastorage.dsl.Operations.ops; import static org.apache.ignite.internal.metastorage.dsl.Operations.put; import static org.apache.ignite.internal.metastorage.dsl.Operations.remove; @@ -2093,6 +2096,63 @@ public abstract class BasicOperationsKeyValueStorageTest extends AbstractKeyValu verify(mockCallback, never()).onRevisionApplied(anyLong()); } + @Test + public void testRevisionByTimestamp() { + // Verify that in case of empty storage -1 will be returned. + assertEquals(-1, storage.revisionByTimestamp(MIN_VALUE)); + assertEquals(-1, storage.revisionByTimestamp(hybridTimestamp(2))); + assertEquals(-1, storage.revisionByTimestamp(hybridTimestamp(5))); + assertEquals(-1, storage.revisionByTimestamp(hybridTimestamp(7))); + assertEquals(-1, storage.revisionByTimestamp(hybridTimestamp(10))); + assertEquals(-1, storage.revisionByTimestamp(hybridTimestamp(12))); + assertEquals(-1, storage.revisionByTimestamp(hybridTimestamp(15))); + assertEquals(-1, storage.revisionByTimestamp(hybridTimestamp(17))); + assertEquals(-1, storage.revisionByTimestamp(MAX_VALUE)); + + // Populate storage with some data in order to have following revision to timestamp mapping: + // 1 -> 5 + // 2 -> 10 + // 3 -> 15 + { + storage.put(key(1), keyValue(1, 1), hybridTimestamp(5)); + assertEquals(1, storage.revision()); + + storage.put(key(1), keyValue(1, 1), hybridTimestamp(10)); + assertEquals(2, storage.revision()); + + storage.put(key(2), keyValue(2, 2), hybridTimestamp(15)); + assertEquals(3, storage.revision()); + } + + // Check revisionByTimestamp() + { + assertEquals(-1, storage.revisionByTimestamp(MIN_VALUE)); + + // There's no revision associated with 2, so closest left one is expected. + assertEquals(-1, storage.revisionByTimestamp(hybridTimestamp(2))); + + // Exact matching 1 -> 5 + assertEquals(1, storage.revisionByTimestamp(hybridTimestamp(5))); + + // There's no revision associated with 7, so closest left one is expected. + assertEquals(1, storage.revisionByTimestamp(hybridTimestamp(7))); + + // Exact matching 2 -> 10 + assertEquals(2, storage.revisionByTimestamp(hybridTimestamp(10))); + + // There's no revision associated with 12, so closest left one is expected. + assertEquals(2, storage.revisionByTimestamp(hybridTimestamp(12))); + + // Exact matching 3 -> 15 + assertEquals(3, storage.revisionByTimestamp(hybridTimestamp(15))); + + // There's no revision associated with 17, so closest left one is expected. + assertEquals(3, storage.revisionByTimestamp(hybridTimestamp(17))); + + assertEquals(3, storage.revisionByTimestamp(MAX_VALUE)); + } + } + private CompletableFuture<Void> watchExact( byte[] key, long revision, int expectedNumCalls, BiConsumer<WatchEvent, Integer> testCondition ) { @@ -2160,19 +2220,19 @@ public abstract class BasicOperationsKeyValueStorageTest extends AbstractKeyValu } void putToMs(byte[] key, byte[] value) { - storage.put(key, value, HybridTimestamp.MIN_VALUE); + storage.put(key, value, MIN_VALUE); } private void putAllToMs(List<byte[]> keys, List<byte[]> values) { - storage.putAll(keys, values, HybridTimestamp.MIN_VALUE); + storage.putAll(keys, values, MIN_VALUE); } private void removeFromMs(byte[] key) { - storage.remove(key, HybridTimestamp.MIN_VALUE); + storage.remove(key, MIN_VALUE); } private void removeAllFromMs(List<byte[]> keys) { - storage.removeAll(keys, HybridTimestamp.MIN_VALUE); + storage.removeAll(keys, MIN_VALUE); } private boolean invokeOnMs(Condition condition, Collection<Operation> success, Collection<Operation> failure) { @@ -2180,7 +2240,7 @@ public abstract class BasicOperationsKeyValueStorageTest extends AbstractKeyValu condition, success, failure, - HybridTimestamp.MIN_VALUE, + MIN_VALUE, new CommandIdGenerator(() -> UUID.randomUUID().toString()).newId() ); } @@ -2188,7 +2248,7 @@ public abstract class BasicOperationsKeyValueStorageTest extends AbstractKeyValu private StatementResult invokeOnMs(If iif) { return storage.invoke( iif, - HybridTimestamp.MIN_VALUE, + MIN_VALUE, new CommandIdGenerator(() -> UUID.randomUUID().toString()).newId() ); } diff --git a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java index 92623da88d..3cd5ed52fc 100644 --- a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java +++ b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.metastorage.impl; import static java.util.Collections.singleton; import static java.util.concurrent.CompletableFuture.completedFuture; -import static org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS; import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -29,7 +28,6 @@ import java.io.Serializable; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.function.LongSupplier; import org.apache.ignite.configuration.ConfigurationValue; import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; @@ -108,9 +106,7 @@ public class StandaloneMetaStorageManager extends MetaStorageManagerImpl { keyValueStorage, mock(TopologyAwareRaftGroupServiceFactory.class), mockConfiguration(), - clock, - mockRaftConfiguration().retryTimeout(), - completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS) + clock ); } @@ -131,9 +127,7 @@ public class StandaloneMetaStorageManager extends MetaStorageManagerImpl { KeyValueStorage storage, TopologyAwareRaftGroupServiceFactory raftServiceFactory, MetaStorageConfiguration configuration, - HybridClock clock, - ConfigurationValue<Long> idempotentCacheTtl, - CompletableFuture<LongSupplier> maxClockSkewMillisFuture + HybridClock clock ) { super( clusterService, @@ -144,9 +138,7 @@ public class StandaloneMetaStorageManager extends MetaStorageManagerImpl { clock, raftServiceFactory, new NoOpMetricManager(), - configuration, - idempotentCacheTtl, - maxClockSkewMillisFuture + configuration ); } diff --git a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java index 406510ffb3..3822585e73 100644 --- a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java +++ b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java @@ -391,6 +391,20 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { } } + @Override + public long revisionByTimestamp(HybridTimestamp timestamp) { + synchronized (mux) { + Map.Entry<Long, Long> revisionEntry = tsToRevMap.floorEntry(timestamp.longValue()); + + if (revisionEntry == null) { + // Nothing to compact yet. + return -1; + } + + return revisionEntry.getValue(); + } + } + @Override public void setRecoveryRevisionListener(@Nullable LongConsumer listener) { synchronized (mux) { @@ -493,15 +507,12 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { NavigableMap<Long, NavigableMap<byte[], Value>> compactedRevsIdx = new TreeMap<>(); - Map.Entry<Long, Long> revisionEntry = tsToRevMap.floorEntry(lowWatermark.longValue()); + long maxRevision = revisionByTimestamp(lowWatermark); - if (revisionEntry == null) { - // Nothing to compact yet. + if (maxRevision == -1) { return; } - long maxRevision = revisionEntry.getValue(); - keysIdx.forEach((key, revs) -> compactForKey(key, revs, compactedKeysIdx, compactedRevsIdx, maxRevision)); keysIdx = compactedKeysIdx; diff --git a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java index 5042c2621b..50eb3d999f 100644 --- a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java +++ b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java @@ -33,7 +33,6 @@ import static org.apache.ignite.internal.partition.replicator.PartitionReplicaLi import static org.apache.ignite.internal.sql.SqlCommon.DEFAULT_SCHEMA_NAME; import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; -import static org.apache.ignite.internal.testframework.TestIgnitionManager.DEFAULT_MAX_CLOCK_SKEW_MS; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn; import static org.apache.ignite.internal.util.ByteUtils.toByteArray; @@ -187,6 +186,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; @@ -524,7 +524,7 @@ public class ItReplicaLifecycleTest extends BaseIgniteAbstractTest { ); } - @Test + @RepeatedTest(100) @Disabled("https://issues.apache.org/jira/browse/IGNITE-22858") void testAlterFilterTrigger(TestInfo testInfo) throws Exception { startNodes(testInfo, 3); @@ -960,9 +960,7 @@ public class ItReplicaLifecycleTest extends BaseIgniteAbstractTest { hybridClock, topologyAwareRaftGroupServiceFactory, new NoOpMetricManager(), - metaStorageConfiguration, - raftConfiguration.retryTimeout(), - completedFuture(() -> DEFAULT_MAX_CLOCK_SKEW_MS) + metaStorageConfiguration ) { @Override public CompletableFuture<Boolean> invoke( diff --git a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java index 2282eb86f5..24f7091e3e 100644 --- a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java +++ b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java @@ -266,9 +266,7 @@ public class MultiActorPlacementDriverTest extends BasePlacementDriverTest { nodeClock, topologyAwareRaftGroupServiceFactory, new NoOpMetricManager(), - metaStorageConfiguration, - raftConfiguration.retryTimeout(), - completedFuture(clockService::maxClockSkewMillis) + metaStorageConfiguration ); if (this.metaStorageManager == null) { diff --git a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java index f586f89e0b..6a6f0a0edc 100644 --- a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java +++ b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java @@ -203,9 +203,7 @@ public class PlacementDriverManagerTest extends BasePlacementDriverTest { nodeClock, topologyAwareRaftGroupServiceFactory, new NoOpMetricManager(), - metaStorageConfiguration, - raftConfiguration.retryTimeout(), - completedFuture(clockService::maxClockSkewMillis) + metaStorageConfiguration ); placementDriverManager = new PlacementDriverManager( diff --git a/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java b/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java index 2913279d95..01e8aebd08 100644 --- a/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java +++ b/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java @@ -107,7 +107,7 @@ public abstract class ItAbstractListenerSnapshotTest<T extends RaftGroupListener private ScheduledExecutorService executor; @InjectConfiguration - protected RaftConfiguration raftConfiguration; + private RaftConfiguration raftConfiguration; /** * Create executor for raft group services. diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle index 5442a588b3..5e66356dc3 100644 --- a/modules/runner/build.gradle +++ b/modules/runner/build.gradle @@ -56,6 +56,7 @@ dependencies { implementation project(':ignite-raft-api') implementation project(':ignite-raft') implementation project(':ignite-metastorage') + implementation project(':ignite-metastorage-cache') implementation project(':ignite-affinity') implementation project(':ignite-table') implementation project(':ignite-index') @@ -147,6 +148,7 @@ dependencies { exclude group: 'com.fasterxml.jackson.core', module: 'jackson-databind' } integrationTestImplementation project(':ignite-metastorage') + integrationTestImplementation project(':ignite-metastorage-cache') integrationTestImplementation project(':ignite-metrics') integrationTestImplementation project(':ignite-table') integrationTestImplementation project(':ignite-transactions') diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java index 48884adb1d..a2ad98eb88 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java @@ -18,9 +18,7 @@ package org.apache.ignite.internal.configuration; import static java.util.concurrent.CompletableFuture.allOf; -import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.stream.Collectors.toUnmodifiableList; -import static org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; @@ -212,9 +210,7 @@ public class ItDistributedConfigurationPropertiesTest extends BaseIgniteAbstract clock, topologyAwareRaftGroupServiceFactory, new NoOpMetricManager(), - metaStorageConfiguration, - raftConfiguration.retryTimeout(), - completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS) + metaStorageConfiguration ); deployWatchesFut = metaStorageManager.deployWatches(); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java index cff23967a3..4ca6f6c31d 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java @@ -17,8 +17,6 @@ package org.apache.ignite.internal.configuration.storage; -import static java.util.concurrent.CompletableFuture.completedFuture; -import static org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; @@ -184,9 +182,7 @@ public class ItDistributedConfigurationStorageTest extends BaseIgniteAbstractTes clock, topologyAwareRaftGroupServiceFactory, new NoOpMetricManager(), - metaStorageConfiguration, - raftConfiguration.retryTimeout(), - completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS) + metaStorageConfiguration ); deployWatchesFut = metaStorageManager.deployWatches(); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index 7064e297a9..0637c23cc6 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -438,9 +438,7 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { hybridClock, topologyAwareRaftGroupServiceFactory, metricManager, - metaStorageConfiguration, - raftConfiguration.retryTimeout(), - maxClockSkewFuture + metaStorageConfiguration ) { @Override public CompletableFuture<Boolean> invoke(Condition condition, Collection<Operation> success, Collection<Operation> failure) { diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index 9ce9ef6e3a..28bcdc769e 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.app; import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.stream.Collectors.toSet; import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.REBALANCE_SCHEDULER_POOL_SIZE; import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ; @@ -134,6 +135,7 @@ import org.apache.ignite.internal.lowwatermark.event.ChangeLowWatermarkEventPara import org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent; import org.apache.ignite.internal.manager.ComponentContext; import org.apache.ignite.internal.metastorage.MetaStorageManager; +import org.apache.ignite.internal.metastorage.cache.IdempotentCacheVacuumizer; import org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration; import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl; import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage; @@ -321,6 +323,9 @@ public class IgniteImpl implements Ignite { /** Configuration manager that handles cluster (distributed) configuration. */ private final ConfigurationManager clusterCfgMgr; + /** Idempotent cache vacuumizer. */ + private final IdempotentCacheVacuumizer idempotentCacheVacuumizer; + /** Cluster initializer. */ private final ClusterInitializer clusterInitializer; @@ -588,8 +593,6 @@ public class IgniteImpl implements Ignite { raftGroupEventsClientListener ); - CompletableFuture<LongSupplier> maxClockSkewMillisFuture = new CompletableFuture<>(); - metaStorageMgr = new MetaStorageManagerImpl( clusterSvc, cmgMgr, @@ -598,9 +601,7 @@ public class IgniteImpl implements Ignite { new RocksDbKeyValueStorage(name, workDir.resolve(METASTORAGE_DB_PATH), failureProcessor), clock, topologyAwareRaftGroupServiceFactory, - metricManager, - raftConfiguration.retryTimeout(), - maxClockSkewMillisFuture + metricManager ); this.cfgStorage = new DistributedConfigurationStorage(name, metaStorageMgr); @@ -622,7 +623,18 @@ public class IgniteImpl implements Ignite { clockService = new ClockServiceImpl(clock, clockWaiter, new SameValueLongSupplier(() -> schemaSyncConfig.maxClockSkew().value())); - maxClockSkewMillisFuture.complete(clockService::maxClockSkewMillis); + idempotentCacheVacuumizer = new IdempotentCacheVacuumizer( + name, + threadPoolsManager.commonScheduler(), + metaStorageMgr::evictIdempotentCommandsCache, + nodeCfgMgr.configurationRegistry().getConfiguration(RaftConfiguration.KEY).retryTimeout(), + clockService, + 1, + 1, + MINUTES + ); + + metaStorageMgr.addElectionListener(idempotentCacheVacuumizer); Consumer<LongFunction<CompletableFuture<?>>> registry = c -> metaStorageMgr.registerRevisionUpdateListener(c::apply); @@ -1114,7 +1126,6 @@ public class IgniteImpl implements Ignite { clusterSvc.updateMetadata( new NodeMetadata(restComponent.hostName(), restComponent.httpPort(), restComponent.httpsPort())); - } catch (Throwable e) { startupExecutor.shutdownNow(); @@ -1168,6 +1179,7 @@ public class IgniteImpl implements Ignite { catalogCompactionRunner, indexMetaStorage, clusterCfgMgr, + idempotentCacheVacuumizer, authenticationManager, placementDriverMgr, metricManager, diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java index b61fb76b24..fcc7294998 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.rebalance; import static java.util.Collections.reverse; import static java.util.concurrent.CompletableFuture.allOf; -import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toSet; @@ -1174,9 +1173,7 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { hybridClock, topologyAwareRaftGroupServiceFactory, metricManager, - metaStorageConfiguration, - raftConfiguration.retryTimeout(), - completedFuture(() -> DEFAULT_MAX_CLOCK_SKEW_MS) + metaStorageConfiguration ); placementDriver = new TestPlacementDriver(() -> PRIMARY_FILTER.apply(clusterService.topologyService().allMembers())); diff --git a/settings.gradle b/settings.gradle index 00016e3ec5..a5c995b2b1 100644 --- a/settings.gradle +++ b/settings.gradle @@ -54,6 +54,7 @@ include(':ignite-runner') include(':ignite-index') include(':ignite-metastorage-api') include(':ignite-metastorage') +include(':ignite-metastorage-cache') include(':ignite-rest-api') include(':ignite-storage-rocksdb') include(':ignite-configuration-annotation-processor') @@ -126,6 +127,7 @@ project(":ignite-runner").projectDir = file('modules/runner') project(":ignite-index").projectDir = file('modules/index') project(":ignite-metastorage-api").projectDir = file('modules/metastorage-api') project(":ignite-metastorage").projectDir = file('modules/metastorage') +project(":ignite-metastorage-cache").projectDir = file('modules/metastorage-cache') project(":ignite-rest-api").projectDir = file('modules/rest-api') project(":ignite-storage-rocksdb").projectDir = file('modules/storage-rocksdb') project(":ignite-configuration-annotation-processor").projectDir = file('modules/configuration-annotation-processor')