This is an automated email from the ASF dual-hosted git repository. sanpwc pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new b594483b22 IGNITE-22214 Meta storage idempotent invokes: implement idempotent cache cleanup logic (#3830) b594483b22 is described below commit b594483b22d473ed55af7552757c32e54d445175 Author: Alexander Lapin <lapin1...@gmail.com> AuthorDate: Thu Jun 6 17:35:26 2024 +0300 IGNITE-22214 Meta storage idempotent invokes: implement idempotent cache cleanup logic (#3830) --- .../DistributionZoneRebalanceEngineTest.java | 16 ++++- .../RebalanceUtilUpdateAssignmentsTest.java | 17 +++++- .../impl/ItIdempotentCommandCacheTest.java | 64 ++++++++++++++++++-- .../impl/ItMetaStorageManagerImplTest.java | 13 +++- .../ItMetaStorageMultipleNodesAbstractTest.java | 6 +- .../impl/ItMetaStorageServicePersistenceTest.java | 9 ++- .../metastorage/impl/ItMetaStorageServiceTest.java | 13 +++- .../metastorage/impl/ItMetaStorageWatchTest.java | 6 +- .../server/raft/ItMetaStorageRaftGroupTest.java | 41 +++++++++++-- .../metastorage/impl/MetaStorageManagerImpl.java | 69 ++++++++++++++++++++-- .../server/raft/MetaStorageListener.java | 29 ++++++++- .../server/raft/MetaStorageWriteHandler.java | 61 ++++++++++++++++++- .../impl/IdempotentCommandCacheTest.java | 28 ++++++--- .../MetaStorageDeployWatchesCorrectnessTest.java | 9 ++- .../impl/MetaStorageManagerRecoveryTest.java | 9 ++- .../impl/StandaloneMetaStorageManager.java | 19 +++++- .../MultiActorPlacementDriverTest.java | 9 ++- .../PlacementDriverManagerTest.java | 9 ++- .../service/ItAbstractListenerSnapshotTest.java | 2 +- .../ItDistributedConfigurationPropertiesTest.java | 6 +- .../ItDistributedConfigurationStorageTest.java | 6 +- .../runner/app/ItIgniteNodeRestartTest.java | 9 ++- .../org/apache/ignite/internal/app/IgniteImpl.java | 8 ++- .../rebalance/ItRebalanceDistributedTest.java | 8 ++- 24 files changed, 410 insertions(+), 56 deletions(-) diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java index 445fa9cfd9..cc301bce76 100644 --- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java +++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java @@ -30,6 +30,7 @@ import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey; import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.REBALANCE_SCHEDULER_POOL_SIZE; import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey; +import static org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS; import static org.apache.ignite.internal.table.TableTestUtils.getTableIdStrict; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; @@ -72,6 +73,8 @@ import org.apache.ignite.internal.affinity.Assignments; import org.apache.ignite.internal.catalog.CatalogManager; import org.apache.ignite.internal.catalog.commands.ColumnParams; import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; +import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; +import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.distributionzones.DistributionZoneManager; import org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil; import org.apache.ignite.internal.distributionzones.Node; @@ -100,6 +103,7 @@ import org.apache.ignite.internal.network.ClusterService; import org.apache.ignite.internal.network.MessagingService; import org.apache.ignite.internal.raft.Command; import org.apache.ignite.internal.raft.WriteCommand; +import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.raft.service.CommandClosure; import org.apache.ignite.internal.raft.service.RaftGroupService; import org.apache.ignite.internal.replicator.TablePartitionId; @@ -112,10 +116,12 @@ import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; /** * Tests the distribution zone dataNodes watch listener in {@link DistributionZoneRebalanceEngine}. */ +@ExtendWith(ConfigurationExtension.class) public class DistributionZoneRebalanceEngineTest extends IgniteAbstractTest { private static final String ZONE_NAME_0 = "zone0"; @@ -141,6 +147,9 @@ public class DistributionZoneRebalanceEngineTest extends IgniteAbstractTest { private ScheduledExecutorService rebalanceScheduler; + @InjectConfiguration + private RaftConfiguration raftConfiguration; + @BeforeEach public void setUp() { String nodeName = "test"; @@ -180,7 +189,12 @@ public class DistributionZoneRebalanceEngineTest extends IgniteAbstractTest { keyValueStorage = spy(new SimpleInMemoryKeyValueStorage(nodeName)); - MetaStorageListener metaStorageListener = new MetaStorageListener(keyValueStorage, mock(ClusterTimeImpl.class)); + MetaStorageListener metaStorageListener = new MetaStorageListener( + keyValueStorage, + mock(ClusterTimeImpl.class), + raftConfiguration.retryTimeout(), + completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS) + ); RaftGroupService metaStorageService = mock(RaftGroupService.class); diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java index 8e98c825c2..af146edfcc 100644 --- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java +++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java @@ -17,10 +17,12 @@ package org.apache.ignite.internal.distributionzones.rebalance; +import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.stream.Collectors.toSet; import static org.apache.ignite.internal.affinity.AffinityUtils.calculateAssignmentForPartition; import static org.apache.ignite.internal.affinity.Assignments.toBytes; import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE; +import static org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; @@ -42,6 +44,8 @@ import org.apache.ignite.internal.affinity.Assignment; import org.apache.ignite.internal.affinity.Assignments; import org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor; import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; +import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; +import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.logger.IgniteLogger; @@ -59,6 +63,7 @@ import org.apache.ignite.internal.network.ClusterService; import org.apache.ignite.internal.network.MessagingService; import org.apache.ignite.internal.raft.Command; import org.apache.ignite.internal.raft.WriteCommand; +import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.raft.service.CommandClosure; import org.apache.ignite.internal.raft.service.RaftGroupService; import org.apache.ignite.internal.replicator.TablePartitionId; @@ -76,7 +81,7 @@ import org.mockito.quality.Strictness; /** * Tests for updating assignment in the meta storage. */ -@ExtendWith(MockitoExtension.class) +@ExtendWith({MockitoExtension.class, ConfigurationExtension.class}) @MockitoSettings(strictness = Strictness.LENIENT) public class RebalanceUtilUpdateAssignmentsTest extends IgniteAbstractTest { private static final IgniteLogger LOG = Loggers.forClass(RebalanceUtilUpdateAssignmentsTest.class); @@ -99,6 +104,9 @@ public class RebalanceUtilUpdateAssignmentsTest extends IgniteAbstractTest { DEFAULT_STORAGE_PROFILE ); + @InjectConfiguration + private RaftConfiguration raftConfiguration; + private static final int partNum = 2; private static final int replicas = 2; @@ -122,7 +130,12 @@ public class RebalanceUtilUpdateAssignmentsTest extends IgniteAbstractTest { keyValueStorage = spy(new SimpleInMemoryKeyValueStorage("test")); - MetaStorageListener metaStorageListener = new MetaStorageListener(keyValueStorage, mock(ClusterTimeImpl.class)); + MetaStorageListener metaStorageListener = new MetaStorageListener( + keyValueStorage, + mock(ClusterTimeImpl.class), + raftConfiguration.retryTimeout(), + completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS) + ); RaftGroupService metaStorageService = mock(RaftGroupService.class); diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java index 66c0610155..453a80eca4 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java @@ -17,7 +17,9 @@ package org.apache.ignite.internal.metastorage.impl; +import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.stream.Collectors.toSet; +import static org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS; import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists; import static org.apache.ignite.internal.metastorage.dsl.Operations.ops; import static org.apache.ignite.internal.metastorage.dsl.Operations.put; @@ -30,6 +32,7 @@ import static org.apache.ignite.internal.util.IgniteUtils.startAsync; import static org.apache.ignite.internal.util.IgniteUtils.stopAsync; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -51,8 +54,12 @@ import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopolog import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.failure.NoOpFailureProcessor; +import org.apache.ignite.internal.hlc.ClockService; +import org.apache.ignite.internal.hlc.ClockServiceImpl; +import org.apache.ignite.internal.hlc.ClockWaiter; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.ByteArray; import org.apache.ignite.internal.lang.NodeStoppingException; import org.apache.ignite.internal.manager.ComponentContext; @@ -114,8 +121,9 @@ public class ItIdempotentCommandCacheTest extends IgniteAbstractTest { private static final int YIELD_RESULT = 10; private static final int ANOTHER_YIELD_RESULT = 20; - @InjectConfiguration("mock.responseTimeout = 100") + @InjectConfiguration("mock.retryTimeout = 10000") private RaftConfiguration raftConfiguration; + @InjectConfiguration("mock.idleSyncTimeInterval = 100") private MetaStorageConfiguration metaStorageConfiguration; @@ -132,6 +140,10 @@ public class ItIdempotentCommandCacheTest extends IgniteAbstractTest { ClusterManagementGroupManager cmgManager; + ClockWaiter clockWaiter; + + ClockService clockService; + Node( TestInfo testInfo, RaftConfiguration raftConfiguration, @@ -184,7 +196,17 @@ public class ItIdempotentCommandCacheTest extends IgniteAbstractTest { clock, topologyAwareRaftGroupServiceFactory, new NoOpMetricManager(), - metaStorageConfiguration + metaStorageConfiguration, + raftConfiguration.retryTimeout(), + completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS) + ); + + clockWaiter = new ClockWaiter(clusterService.nodeName(), clock); + + clockService = new ClockServiceImpl( + clock, + clockWaiter, + () -> TEST_MAX_CLOCK_SKEW_MILLIS ); } @@ -193,7 +215,10 @@ public class ItIdempotentCommandCacheTest extends IgniteAbstractTest { when(cmgManager.metaStorageNodes()).thenReturn(metaStorageNodesFut); } - assertThat(startAsync(new ComponentContext(), clusterService, raftManager, metaStorageManager), willCompleteSuccessfully()); + assertThat( + startAsync(new ComponentContext(), clusterService, raftManager, metaStorageManager, clockWaiter), + willCompleteSuccessfully() + ); } void deployWatches() { @@ -201,7 +226,7 @@ public class ItIdempotentCommandCacheTest extends IgniteAbstractTest { } void stop() throws Exception { - List<IgniteComponent> components = List.of(metaStorageManager, raftManager, clusterService); + List<IgniteComponent> components = List.of(clockWaiter, metaStorageManager, raftManager, clusterService); closeAll(Stream.concat( components.stream().map(c -> c::beforeNodeStop), @@ -364,6 +389,8 @@ public class ItIdempotentCommandCacheTest extends IgniteAbstractTest { // Restart cluster. startCluster(testInfo); + long timestampAfterRestartPhysicalLong = nodes.get(0).clockService.now().getPhysical(); + leader = leader(raftClient()); // Run same idempotent command one more time and check that condition wasn't re-evaluated, but was retrieved from the cache instead. @@ -374,9 +401,36 @@ public class ItIdempotentCommandCacheTest extends IgniteAbstractTest { assertTrue((Boolean) commandProcessingResult2); assertTrue(leader.checkValueInStorage(TEST_KEY.bytes(), TEST_VALUE)); } else { - assertEquals(YIELD_RESULT, ((StatementResult) commandProcessingResult).getAsInt()); + assertEquals(YIELD_RESULT, ((StatementResult) commandProcessingResult2).getAsInt()); assertTrue(leader.checkValueInStorage(TEST_KEY_2.bytes(), TEST_VALUE_2)); } + + for (Node node : nodes) { + assertThat(node.clockService.waitFor( + new HybridTimestamp( + timestampAfterRestartPhysicalLong + raftConfiguration.retryTimeout().value() + + node.clockService.maxClockSkewMillis(), + 0 + ) + ), willCompleteSuccessfully()); + } + + for (Node node : nodes) { + node.metaStorageManager.evictIdempotentCommandsCache(); + } + + + // Run same idempotent command one more time and check that condition **was** re-evaluated and not retrieved from the cache. + CompletableFuture<Object> commandProcessingResultFuture3 = raftClient().run(idempotentCommand); + assertThat(commandProcessingResultFuture3, willCompleteSuccessfully()); + Object commandProcessingResult3 = commandProcessingResultFuture3.get(); + if (idempotentCommand instanceof InvokeCommand) { + assertFalse((Boolean) commandProcessingResult3); + assertTrue(leader.checkValueInStorage(TEST_KEY.bytes(), ANOTHER_VALUE)); + } else { + assertEquals(ANOTHER_YIELD_RESULT, ((StatementResult) commandProcessingResult3).getAsInt()); + assertTrue(leader.checkValueInStorage(TEST_KEY_2.bytes(), ANOTHER_VALUE_2)); + } } private Node leader(RaftGroupService raftClient) { diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java index ab273a4d70..024087ba0b 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.metastorage.impl; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.stream.Collectors.toList; +import static org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS; import static org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.clusterService; import static org.apache.ignite.internal.testframework.flow.TestFlowUtils.subscribeToList; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast; @@ -99,10 +100,12 @@ public class ItMetaStorageManagerImplTest extends IgniteAbstractTest { private MetaStorageManagerImpl metaStorageManager; + @InjectConfiguration + private RaftConfiguration raftConfiguration; + @BeforeEach void setUp( TestInfo testInfo, - @InjectConfiguration RaftConfiguration raftConfiguration, @InjectConfiguration("mock.idleSyncTimeInterval = 100") MetaStorageConfiguration metaStorageConfiguration ) { var addr = new NetworkAddress("localhost", 10_000); @@ -143,7 +146,9 @@ public class ItMetaStorageManagerImplTest extends IgniteAbstractTest { clock, topologyAwareRaftGroupServiceFactory, new NoOpMetricManager(), - metaStorageConfiguration + metaStorageConfiguration, + raftConfiguration.retryTimeout(), + completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS) ); assertThat( @@ -230,7 +235,9 @@ public class ItMetaStorageManagerImplTest extends IgniteAbstractTest { storage, new HybridClockImpl(), mock(TopologyAwareRaftGroupServiceFactory.class), - new NoOpMetricManager() + new NoOpMetricManager(), + raftConfiguration.retryTimeout(), + completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS) ); assertThat(metaStorageManager.stopAsync(new ComponentContext()), willCompleteSuccessfully()); diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java index 5e55468fb1..b565090691 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java @@ -18,7 +18,9 @@ package org.apache.ignite.internal.metastorage.impl; import static java.util.concurrent.CompletableFuture.allOf; +import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.stream.Collectors.toSet; +import static org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS; import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists; import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision; import static org.apache.ignite.internal.metastorage.dsl.Operations.noop; @@ -202,7 +204,9 @@ public abstract class ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr clock, topologyAwareRaftGroupServiceFactory, new NoOpMetricManager(), - metaStorageConfiguration + metaStorageConfiguration, + raftConfiguration.retryTimeout(), + completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS) ); deployWatchesFut = metaStorageManager.deployWatches(); diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java index d73d74e6b9..ac10b1c626 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.metastorage.impl; +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS; import static org.junit.jupiter.api.Assertions.assertEquals; import java.nio.charset.StandardCharsets; @@ -158,7 +160,12 @@ public class ItMetaStorageServicePersistenceTest extends ItAbstractListenerSnaps return s; }); - return new MetaStorageListener(storage, new ClusterTimeImpl(nodeName, new IgniteSpinBusyLock(), new HybridClockImpl())); + return new MetaStorageListener( + storage, + new ClusterTimeImpl(nodeName, new IgniteSpinBusyLock(), new HybridClockImpl()), + raftConfiguration.retryTimeout(), + completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS) + ); } /** {@inheritDoc} */ diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java index ab00dedab7..36332b02a9 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java @@ -17,8 +17,10 @@ package org.apache.ignite.internal.metastorage.impl; +import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toUnmodifiableSet; +import static org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS; import static org.apache.ignite.internal.metastorage.dsl.Conditions.and; import static org.apache.ignite.internal.metastorage.dsl.Conditions.or; import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision; @@ -186,8 +188,11 @@ public class ItMetaStorageServiceTest extends BaseIgniteAbstractTest { private MetaStorageService metaStorageService; + private RaftConfiguration raftConfiguration; + Node(ClusterService clusterService, RaftConfiguration raftConfiguration, Path dataPath) { this.clusterService = clusterService; + this.raftConfiguration = raftConfiguration; HybridClock clock = new HybridClockImpl(); @@ -197,7 +202,6 @@ public class ItMetaStorageServiceTest extends BaseIgniteAbstractTest { dataPath.resolve(name()), clock ); - this.clusterTime = new ClusterTimeImpl(clusterService.nodeName(), new IgniteSpinBusyLock(), clock); this.mockStorage = mock(KeyValueStorage.class); @@ -234,7 +238,12 @@ public class ItMetaStorageServiceTest extends BaseIgniteAbstractTest { assert peer != null; - var listener = new MetaStorageListener(mockStorage, clusterTime); + var listener = new MetaStorageListener( + mockStorage, + clusterTime, + raftConfiguration.retryTimeout(), + completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS) + ); var raftNodeId = new RaftNodeId(MetastorageGroupId.INSTANCE, peer); diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java index 583d08488f..947f0869a9 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java @@ -17,8 +17,10 @@ package org.apache.ignite.internal.metastorage.impl; +import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; +import static org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS; import static org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.findLocalAddresses; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; @@ -193,7 +195,9 @@ public class ItMetaStorageWatchTest extends IgniteAbstractTest { clock, topologyAwareRaftGroupServiceFactory, new NoOpMetricManager(), - metaStorageConfiguration + metaStorageConfiguration, + raftConfiguration.retryTimeout(), + completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS) ); components.add(metaStorageManager); diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java index 27d09f80a5..50894e927e 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java @@ -17,8 +17,10 @@ package org.apache.ignite.internal.metastorage.server.raft; +import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.stream.Collectors.collectingAndThen; import static java.util.stream.Collectors.toSet; +import static org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS; import static org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.findLocalAddresses; import static org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.waitForTopology; import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults; @@ -406,18 +408,45 @@ public class ItMetaStorageRaftGroupTest extends IgniteAbstractTest { var raftNodeId1 = new RaftNodeId(MetastorageGroupId.INSTANCE, membersConfiguration.peer(localMemberName(cluster.get(0)))); - metaStorageRaftSrv1.startRaftNode(raftNodeId1, membersConfiguration, - new MetaStorageListener(mockStorage, mock(ClusterTimeImpl.class)), defaults()); + metaStorageRaftSrv1.startRaftNode( + raftNodeId1, + membersConfiguration, + new MetaStorageListener( + mockStorage, + mock(ClusterTimeImpl.class), + raftConfiguration.retryTimeout(), + completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS) + ), + defaults() + ); var raftNodeId2 = new RaftNodeId(MetastorageGroupId.INSTANCE, membersConfiguration.peer(localMemberName(cluster.get(1)))); - metaStorageRaftSrv2.startRaftNode(raftNodeId2, membersConfiguration, - new MetaStorageListener(mockStorage, mock(ClusterTimeImpl.class)), defaults()); + metaStorageRaftSrv2.startRaftNode( + raftNodeId2, + membersConfiguration, + new MetaStorageListener( + mockStorage, + mock(ClusterTimeImpl.class), + raftConfiguration.retryTimeout(), + completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS) + ), + defaults() + ); var raftNodeId3 = new RaftNodeId(MetastorageGroupId.INSTANCE, membersConfiguration.peer(localMemberName(cluster.get(2)))); - metaStorageRaftSrv3.startRaftNode(raftNodeId3, membersConfiguration, - new MetaStorageListener(mockStorage, mock(ClusterTimeImpl.class)), defaults()); + metaStorageRaftSrv3.startRaftNode( + raftNodeId3, + membersConfiguration, + new MetaStorageListener( + mockStorage, + mock(ClusterTimeImpl.class), + raftConfiguration.retryTimeout(), + completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS) + ), + defaults() + ); metaStorageRaftGrpSvc1 = waitForRaftGroupServiceSafely(RaftGroupServiceImpl.start( MetastorageGroupId.INSTANCE, diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java index cb63d3ddf9..01f87b37a5 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java @@ -30,6 +30,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Flow.Publisher; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.LongSupplier; +import org.apache.ignite.configuration.ConfigurationValue; import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; import org.apache.ignite.internal.hlc.HybridClock; @@ -130,6 +132,14 @@ public class MetaStorageManagerImpl implements MetaStorageManager { private volatile MetaStorageConfiguration metaStorageConfiguration; + private final ConfigurationValue<Long> idempotentCacheTtl; + + private final CompletableFuture<LongSupplier> maxClockSkewMillisFuture; + + private volatile MetaStorageListener followerListener; + + private volatile MetaStorageListener learnerListener; + /** * The constructor. * @@ -140,6 +150,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager { * @param storage Storage. This component owns this resource and will manage its lifecycle. * @param clock A hybrid logical clock. * @param metricManager Metric manager. + * @param maxClockSkewMillisFuture Future with maximum clock skew in milliseconds. */ public MetaStorageManagerImpl( ClusterService clusterService, @@ -149,7 +160,9 @@ public class MetaStorageManagerImpl implements MetaStorageManager { KeyValueStorage storage, HybridClock clock, TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory, - MetricManager metricManager + MetricManager metricManager, + ConfigurationValue<Long> idempotentCacheTtl, + CompletableFuture<LongSupplier> maxClockSkewMillisFuture ) { this.clusterService = clusterService; this.raftMgr = raftMgr; @@ -157,9 +170,11 @@ public class MetaStorageManagerImpl implements MetaStorageManager { this.logicalTopologyService = logicalTopologyService; this.storage = storage; this.clusterTime = new ClusterTimeImpl(clusterService.nodeName(), busyLock, clock); - metaStorageMetricSource = new MetaStorageMetricSource(clusterTime); + this.metaStorageMetricSource = new MetaStorageMetricSource(clusterTime); this.topologyAwareRaftGroupServiceFactory = topologyAwareRaftGroupServiceFactory; this.metricManager = metricManager; + this.idempotentCacheTtl = idempotentCacheTtl; + this.maxClockSkewMillisFuture = maxClockSkewMillisFuture; } /** @@ -175,9 +190,22 @@ public class MetaStorageManagerImpl implements MetaStorageManager { HybridClock clock, TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory, MetricManager metricManager, - MetaStorageConfiguration configuration + MetaStorageConfiguration configuration, + ConfigurationValue<Long> idempotentCacheTtl, + CompletableFuture<LongSupplier> maxClockSkewMillisFuture ) { - this(clusterService, cmgMgr, logicalTopologyService, raftMgr, storage, clock, topologyAwareRaftGroupServiceFactory, metricManager); + this( + clusterService, + cmgMgr, + logicalTopologyService, + raftMgr, + storage, + clock, + topologyAwareRaftGroupServiceFactory, + metricManager, + idempotentCacheTtl, + maxClockSkewMillisFuture + ); configure(configuration); } @@ -293,10 +321,17 @@ public class MetaStorageManagerImpl implements MetaStorageManager { assert localMetaStorageConfiguration != null : "Meta Storage configuration has not been set"; + followerListener = new MetaStorageListener( + storage, + clusterTime, + idempotentCacheTtl, + maxClockSkewMillisFuture + ); + CompletableFuture<TopologyAwareRaftGroupService> raftServiceFuture = raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture( new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer), configuration, - new MetaStorageListener(storage, clusterTime), + followerListener, RaftGroupEventsListener.noopLsnr, disruptorConfig, topologyAwareRaftGroupServiceFactory @@ -334,10 +369,17 @@ public class MetaStorageManagerImpl implements MetaStorageManager { assert localPeer != null; + learnerListener = new MetaStorageListener( + storage, + clusterTime, + idempotentCacheTtl, + maxClockSkewMillisFuture + ); + return raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture( new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer), configuration, - new MetaStorageListener(storage, clusterTime), + learnerListener, RaftGroupEventsListener.noopLsnr, disruptorConfig ); @@ -845,4 +887,19 @@ public class MetaStorageManagerImpl implements MetaStorageManager { public CompletableFuture<Void> notifyRevisionUpdateListenerOnStart() { return recoveryFinishedFuture.thenCompose(storage::notifyRevisionUpdateListenerOnStart); } + + /** + * Removes obsolete entries from both volatile and persistent idempotent command cache. + */ + @TestOnly + @Deprecated(forRemoval = true) + // TODO: https://issues.apache.org/jira/browse/IGNITE-19417 cache eviction should be triggered by MS GC instead. + public void evictIdempotentCommandsCache() { + if (followerListener != null) { + followerListener.evictIdempotentCommandsCache(); + } + if (learnerListener != null) { + learnerListener.evictIdempotentCommandsCache(); + } + } } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java index a49e594983..163bd2dcad 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java @@ -24,7 +24,11 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; +import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; +import java.util.function.LongSupplier; +import org.apache.ignite.configuration.ConfigurationValue; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.metastorage.Entry; import org.apache.ignite.internal.metastorage.MetaStorageManager; import org.apache.ignite.internal.metastorage.command.GetAllCommand; @@ -44,6 +48,7 @@ import org.apache.ignite.internal.raft.service.CommandClosure; import org.apache.ignite.internal.raft.service.RaftGroupListener; import org.apache.ignite.internal.util.Cursor; import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.TestOnly; /** * Meta storage listener. @@ -60,9 +65,19 @@ public class MetaStorageListener implements RaftGroupListener, BeforeApplyHandle * * @param storage Storage. */ - public MetaStorageListener(KeyValueStorage storage, ClusterTimeImpl clusterTime) { + public MetaStorageListener( + KeyValueStorage storage, + ClusterTimeImpl clusterTime, + ConfigurationValue<Long> idempotentCacheTtl, + CompletableFuture<LongSupplier> maxClockSkewMillisFuture + ) { this.storage = storage; - this.writeHandler = new MetaStorageWriteHandler(storage, clusterTime); + this.writeHandler = new MetaStorageWriteHandler( + storage, + clusterTime, + idempotentCacheTtl, + maxClockSkewMillisFuture + ); } @Override @@ -174,4 +189,14 @@ public class MetaStorageListener implements RaftGroupListener, BeforeApplyHandle @Override public void onShutdown() { } + + /** + * Removes obsolete entries from both volatile and persistent idempotent command cache. + */ + @TestOnly + @Deprecated(forRemoval = true) + // TODO: https://issues.apache.org/jira/browse/IGNITE-19417 cache eviction should be triggered by MS GC instead. + public void evictIdempotentCommandsCache() { + writeHandler.evictIdempotentCommandsCache(HybridTimestamp.MIN_VALUE); + } } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java index 95c3685fb1..33bc88b658 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java @@ -18,13 +18,18 @@ package org.apache.ignite.internal.metastorage.server.raft; import static java.util.Arrays.copyOfRange; +import static java.util.stream.Collectors.toList; import static org.apache.ignite.internal.util.ByteUtils.byteToBoolean; import java.io.Serializable; import java.nio.charset.StandardCharsets; +import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.LongSupplier; +import org.apache.ignite.configuration.ConfigurationValue; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.logger.IgniteLogger; @@ -61,6 +66,7 @@ import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl; import org.apache.ignite.internal.raft.Command; import org.apache.ignite.internal.raft.WriteCommand; import org.apache.ignite.internal.raft.service.CommandClosure; +import org.apache.ignite.internal.util.ArrayUtils; import org.apache.ignite.internal.util.ByteUtils; import org.apache.ignite.internal.util.Cursor; import org.jetbrains.annotations.Nullable; @@ -81,9 +87,20 @@ public class MetaStorageWriteHandler { private final Map<CommandId, IdempotentCommandCachedResult> idempotentCommandCache = new ConcurrentHashMap<>(); - MetaStorageWriteHandler(KeyValueStorage storage, ClusterTimeImpl clusterTime) { + private final ConfigurationValue<Long> idempotentCacheTtl; + + private final CompletableFuture<LongSupplier> maxClockSkewMillisFuture; + + MetaStorageWriteHandler( + KeyValueStorage storage, + ClusterTimeImpl clusterTime, + ConfigurationValue<Long> idempotentCacheTtl, + CompletableFuture<LongSupplier> maxClockSkewMillisFuture + ) { this.storage = storage; this.clusterTime = clusterTime; + this.idempotentCacheTtl = idempotentCacheTtl; + this.maxClockSkewMillisFuture = maxClockSkewMillisFuture; } /** @@ -95,7 +112,14 @@ public class MetaStorageWriteHandler { CommandClosure<WriteCommand> resultClosure; if (command instanceof IdempotentCommand) { - CommandId commandId = ((IdempotentCommand) command).id(); + IdempotentCommand idempotentCommand = ((IdempotentCommand) command); + CommandId commandId = idempotentCommand.id(); + + // TODO: https://issues.apache.org/jira/browse/IGNITE-19417 Remove. + if (idempotentCommand.safeTime().getPhysical() % 100 == 0) { + evictIdempotentCommandsCache(((IdempotentCommand) command).safeTime()); + } + IdempotentCommandCachedResult cachedResult = idempotentCommandCache.get(commandId); if (cachedResult != null) { @@ -352,6 +376,39 @@ public class MetaStorageWriteHandler { } } + /** + * Removes obsolete entries from both volatile and persistent idempotent command cache. + * + * @param safeTime Trigger operation safe time. TODO: https://issues.apache.org/jira/browse/IGNITE-19417 Remove. + */ + // TODO: https://issues.apache.org/jira/browse/IGNITE-19417 Call on meta storage compaction. + void evictIdempotentCommandsCache(HybridTimestamp safeTime) { + HybridTimestamp cleanupTimestamp = clusterTime.now(); + LOG.info("Idempotent command cache cleanup started [cleanupTimestamp={}].", cleanupTimestamp); + + maxClockSkewMillisFuture.thenAccept(maxClockSkewMillis -> { + List<CommandId> commandIdsToRemove = idempotentCommandCache.entrySet().stream() + .filter(entry -> entry.getValue().commandStartTime.getPhysical() + <= cleanupTimestamp.getPhysical() - (idempotentCacheTtl.value() + maxClockSkewMillis.getAsLong())) + .map(entry -> entry.getKey()) + .collect(toList()); + + if (!commandIdsToRemove.isEmpty()) { + List<byte[]> commandIdStorageKeys = commandIdsToRemove.stream() + .map(commandId -> ArrayUtils.concat(new byte[]{}, ByteUtils.toBytes(commandId))) + .collect(toList()); + + storage.removeAll(commandIdStorageKeys, safeTime); + + commandIdsToRemove.forEach(idempotentCommandCache.keySet()::remove); + } + + LOG.info("Idempotent command cache cleanup finished [cleanupTimestamp={}, cleanupCompletionTimestamp={}," + + " removedEntriesCount={}, cacheSize={}].", cleanupTimestamp, clusterTime.now(), commandIdsToRemove.size(), + idempotentCommandCache.size()); + }); + } + private static class IdempotentCommandCachedResult { @Nullable final Serializable result; diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/IdempotentCommandCacheTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/IdempotentCommandCacheTest.java index 968e129d8c..c1cdd33bfd 100644 --- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/IdempotentCommandCacheTest.java +++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/IdempotentCommandCacheTest.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.metastorage.impl; +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS; import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists; import static org.apache.ignite.internal.metastorage.dsl.Operations.ops; import static org.apache.ignite.internal.metastorage.dsl.Operations.put; @@ -32,6 +34,8 @@ import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.List; import java.util.UUID; +import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; +import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.lang.ByteArray; @@ -47,22 +51,27 @@ import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStora import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener; import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl; import org.apache.ignite.internal.raft.WriteCommand; +import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.raft.service.CommandClosure; +import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; /** * Tests for idempotency of {@link org.apache.ignite.internal.metastorage.command.IdempotentCommand}. */ -public class IdempotentCommandCacheTest { +@ExtendWith(ConfigurationExtension.class) +public class IdempotentCommandCacheTest extends BaseIgniteAbstractTest { private static final String NODE_NAME = "node"; private static final MetaStorageCommandsFactory CMD_FACTORY = new MetaStorageCommandsFactory(); - private final KeyValueStorage storage; + private KeyValueStorage storage; - private final MetaStorageListener metaStorageListener; + private MetaStorageListener metaStorageListener; private final HybridClock clock = new HybridClockImpl(); @@ -71,14 +80,17 @@ public class IdempotentCommandCacheTest { private final CommandIdGenerator commandIdGenerator = new CommandIdGenerator(() -> UUID.randomUUID().toString()); - /** - * Constructor. - */ - public IdempotentCommandCacheTest() { + @InjectConfiguration + private RaftConfiguration raftConfiguration; + + @BeforeEach + public void setUp() { storage = new SimpleInMemoryKeyValueStorage(NODE_NAME); metaStorageListener = new MetaStorageListener( storage, - new ClusterTimeImpl(NODE_NAME, new IgniteSpinBusyLock(), clock) + new ClusterTimeImpl(NODE_NAME, new IgniteSpinBusyLock(), clock), + raftConfiguration.retryTimeout(), + completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS) ); } diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java index d6fc9c0431..c548f0d734 100644 --- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java +++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.metastorage.impl; import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -43,6 +44,7 @@ import org.apache.ignite.internal.network.ClusterService; import org.apache.ignite.internal.raft.RaftManager; import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService; import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; +import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; @@ -56,6 +58,9 @@ public class MetaStorageDeployWatchesCorrectnessTest extends IgniteAbstractTest @InjectConfiguration private static MetaStorageConfiguration metaStorageConfiguration; + @InjectConfiguration + private static RaftConfiguration raftConfiguration; + /** * Returns a stream with test arguments. * @@ -87,7 +92,9 @@ public class MetaStorageDeployWatchesCorrectnessTest extends IgniteAbstractTest clock, mock(TopologyAwareRaftGroupServiceFactory.class), new NoOpMetricManager(), - metaStorageConfiguration + metaStorageConfiguration, + raftConfiguration.retryTimeout(), + completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS) ), StandaloneMetaStorageManager.create() ); diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java index a8b86c57b2..ac65131cac 100644 --- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java +++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.metastorage.impl; import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; @@ -48,6 +49,7 @@ import org.apache.ignite.internal.network.MessagingService; import org.apache.ignite.internal.network.serialization.MessageSerializationRegistry; import org.apache.ignite.internal.raft.RaftManager; import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; +import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.raft.service.RaftGroupService; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.network.NodeMetadata; @@ -65,6 +67,9 @@ public class MetaStorageManagerRecoveryTest extends BaseIgniteAbstractTest { @InjectConfiguration private static MetaStorageConfiguration metaStorageConfiguration; + @InjectConfiguration + private static RaftConfiguration raftConfiguration; + private MetaStorageManagerImpl metaStorageManager; private KeyValueStorage kvs; @@ -89,7 +94,9 @@ public class MetaStorageManagerRecoveryTest extends BaseIgniteAbstractTest { clock, mock(TopologyAwareRaftGroupServiceFactory.class), new NoOpMetricManager(), - metaStorageConfiguration + metaStorageConfiguration, + raftConfiguration.retryTimeout(), + completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS) ); } diff --git a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java index c9b02c071a..1e70463ad3 100644 --- a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java +++ b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.metastorage.impl; import static java.util.Collections.singleton; import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS; import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -28,6 +29,7 @@ import java.io.Serializable; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.function.LongSupplier; import org.apache.ignite.configuration.ConfigurationValue; import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; @@ -46,6 +48,7 @@ import org.apache.ignite.internal.raft.ReadCommand; import org.apache.ignite.internal.raft.WriteCommand; import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService; import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; +import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.raft.service.BeforeApplyHandler; import org.apache.ignite.internal.raft.service.CommandClosure; import org.apache.ignite.internal.raft.service.RaftGroupListener; @@ -105,7 +108,9 @@ public class StandaloneMetaStorageManager extends MetaStorageManagerImpl { keyValueStorage, mock(TopologyAwareRaftGroupServiceFactory.class), mockConfiguration(), - clock + clock, + mockRaftConfiguration().retryTimeout(), + completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS) ); } @@ -126,7 +131,9 @@ public class StandaloneMetaStorageManager extends MetaStorageManagerImpl { KeyValueStorage storage, TopologyAwareRaftGroupServiceFactory raftServiceFactory, MetaStorageConfiguration configuration, - HybridClock clock + HybridClock clock, + ConfigurationValue<Long> idempotentCacheTtl, + CompletableFuture<LongSupplier> maxClockSkewMillisFuture ) { super( clusterService, @@ -137,7 +144,9 @@ public class StandaloneMetaStorageManager extends MetaStorageManagerImpl { clock, raftServiceFactory, new NoOpMetricManager(), - configuration + configuration, + idempotentCacheTtl, + maxClockSkewMillisFuture ); } @@ -211,6 +220,10 @@ public class StandaloneMetaStorageManager extends MetaStorageManagerImpl { return configuration; } + private static RaftConfiguration mockRaftConfiguration() { + return mock(RaftConfiguration.class, LENIENT_SETTINGS); + } + private static CompletableFuture<Serializable> runCommand(Command command, RaftGroupListener listener) { CompletableFuture<Serializable> future = new CompletableFuture<>(); diff --git a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java index ad403899f3..2282eb86f5 100644 --- a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java +++ b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java @@ -43,6 +43,7 @@ import java.util.stream.IntStream; import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; +import org.apache.ignite.internal.hlc.ClockService; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.hlc.TestClockService; @@ -254,6 +255,8 @@ public class MultiActorPlacementDriverTest extends BasePlacementDriverTest { var storage = new SimpleInMemoryKeyValueStorage(nodeName); + ClockService clockService = new TestClockService(nodeClock); + var metaStorageManager = new MetaStorageManagerImpl( clusterService, cmgManager, @@ -263,7 +266,9 @@ public class MultiActorPlacementDriverTest extends BasePlacementDriverTest { nodeClock, topologyAwareRaftGroupServiceFactory, new NoOpMetricManager(), - metaStorageConfiguration + metaStorageConfiguration, + raftConfiguration.retryTimeout(), + completedFuture(clockService::maxClockSkewMillis) ); if (this.metaStorageManager == null) { @@ -279,7 +284,7 @@ public class MultiActorPlacementDriverTest extends BasePlacementDriverTest { logicalTopologyService, raftManager, topologyAwareRaftGroupServiceFactory, - new TestClockService(nodeClock) + clockService ); res.add(new Node(nodeName, clusterService, raftManager, metaStorageManager, placementDriverManager)); diff --git a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java index b7cda99359..f586f89e0b 100644 --- a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java +++ b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java @@ -63,6 +63,7 @@ import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopolog import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot; import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; +import org.apache.ignite.internal.hlc.ClockService; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.hlc.HybridTimestamp; @@ -191,6 +192,8 @@ public class PlacementDriverManagerTest extends BasePlacementDriverTest { var storage = new SimpleInMemoryKeyValueStorage(nodeName); + ClockService clockService = new TestClockService(nodeClock); + metaStorageManager = new MetaStorageManagerImpl( clusterService, cmgManager, @@ -200,7 +203,9 @@ public class PlacementDriverManagerTest extends BasePlacementDriverTest { nodeClock, topologyAwareRaftGroupServiceFactory, new NoOpMetricManager(), - metaStorageConfiguration + metaStorageConfiguration, + raftConfiguration.retryTimeout(), + completedFuture(clockService::maxClockSkewMillis) ); placementDriverManager = new PlacementDriverManager( @@ -212,7 +217,7 @@ public class PlacementDriverManagerTest extends BasePlacementDriverTest { logicalTopologyService, raftManager, topologyAwareRaftGroupServiceFactory, - new TestClockService(nodeClock) + clockService ); ComponentContext componentContext = new ComponentContext(); diff --git a/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java b/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java index 01e8aebd08..2913279d95 100644 --- a/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java +++ b/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java @@ -107,7 +107,7 @@ public abstract class ItAbstractListenerSnapshotTest<T extends RaftGroupListener private ScheduledExecutorService executor; @InjectConfiguration - private RaftConfiguration raftConfiguration; + protected RaftConfiguration raftConfiguration; /** * Create executor for raft group services. diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java index 4471754bff..ee6af33877 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java @@ -18,7 +18,9 @@ package org.apache.ignite.internal.configuration; import static java.util.concurrent.CompletableFuture.allOf; +import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.stream.Collectors.toUnmodifiableList; +import static org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; @@ -209,7 +211,9 @@ public class ItDistributedConfigurationPropertiesTest extends BaseIgniteAbstract clock, topologyAwareRaftGroupServiceFactory, new NoOpMetricManager(), - metaStorageConfiguration + metaStorageConfiguration, + raftConfiguration.retryTimeout(), + completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS) ); deployWatchesFut = metaStorageManager.deployWatches(); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java index e6cc4ba817..04cdc3d132 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.configuration.storage; +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; @@ -181,7 +183,9 @@ public class ItDistributedConfigurationStorageTest extends BaseIgniteAbstractTes clock, topologyAwareRaftGroupServiceFactory, new NoOpMetricManager(), - metaStorageConfiguration + metaStorageConfiguration, + raftConfiguration.retryTimeout(), + completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS) ); deployWatchesFut = metaStorageManager.deployWatches(); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index 80051692c6..e85b913e65 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -417,6 +417,8 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { InvokeInterceptor metaStorageInvokeInterceptor = metaStorageInvokeInterceptorByNode.get(idx); + CompletableFuture<LongSupplier> maxClockSkewFuture = new CompletableFuture<>(); + var metaStorageMgr = new MetaStorageManagerImpl( clusterSvc, cmgManager, @@ -426,7 +428,9 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { hybridClock, topologyAwareRaftGroupServiceFactory, metricManager, - metaStorageConfiguration + metaStorageConfiguration, + raftConfiguration.retryTimeout(), + maxClockSkewFuture ) { @Override public CompletableFuture<Boolean> invoke(Condition condition, Collection<Operation> success, Collection<Operation> failure) { @@ -464,12 +468,15 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { SchemaSynchronizationConfiguration schemaSyncConfiguration = clusterConfigRegistry.getConfiguration( SchemaSynchronizationConfiguration.KEY ); + ClockService clockService = new ClockServiceImpl( hybridClock, clockWaiter, () -> schemaSyncConfiguration.maxClockSkew().value() ); + maxClockSkewFuture.complete(clockService::maxClockSkewMillis); + var placementDriverManager = new PlacementDriverManager( name, metaStorageMgr, diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index e966a158e3..80b612e8c0 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -576,6 +576,8 @@ public class IgniteImpl implements Ignite { raftGroupEventsClientListener ); + CompletableFuture<LongSupplier> maxClockSkewMillisFuture = new CompletableFuture<>(); + metaStorageMgr = new MetaStorageManagerImpl( clusterSvc, cmgMgr, @@ -584,7 +586,9 @@ public class IgniteImpl implements Ignite { new RocksDbKeyValueStorage(name, workDir.resolve(METASTORAGE_DB_PATH), failureProcessor), clock, topologyAwareRaftGroupServiceFactory, - metricManager + metricManager, + raftConfiguration.retryTimeout(), + maxClockSkewMillisFuture ); this.cfgStorage = new DistributedConfigurationStorage(name, metaStorageMgr); @@ -606,6 +610,8 @@ public class IgniteImpl implements Ignite { clockService = new ClockServiceImpl(clock, clockWaiter, new SameValueLongSupplier(() -> schemaSyncConfig.maxClockSkew().value())); + maxClockSkewMillisFuture.complete(clockService::maxClockSkewMillis); + Consumer<LongFunction<CompletableFuture<?>>> registry = c -> metaStorageMgr.registerRevisionUpdateListener(c::apply); placementDriverMgr = new PlacementDriverManager( diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java index 9306da8ddd..71afc3cbad 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.rebalance; import static java.util.Collections.reverse; import static java.util.concurrent.CompletableFuture.allOf; +import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toSet; @@ -33,6 +34,7 @@ import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUt import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.plannedPartAssignmentsKey; import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; +import static org.apache.ignite.internal.testframework.TestIgnitionManager.DEFAULT_MAX_CLOCK_SKEW_MS; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn; @@ -1101,7 +1103,9 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { hybridClock, topologyAwareRaftGroupServiceFactory, metricManager, - metaStorageConfiguration + metaStorageConfiguration, + raftConfiguration.retryTimeout(), + completedFuture(() -> DEFAULT_MAX_CLOCK_SKEW_MS) ); var placementDriver = new TestPlacementDriver(() -> PRIMARY_FILTER.apply(clusterService.topologyService().allMembers())); @@ -1125,7 +1129,7 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { ClockService clockService = new ClockServiceImpl( hybridClock, clockWaiter, - () -> TestIgnitionManager.DEFAULT_MAX_CLOCK_SKEW_MS + () -> DEFAULT_MAX_CLOCK_SKEW_MS ); TransactionInflights transactionInflights = new TransactionInflights(placementDriver, clockService);