This is an automated email from the ASF dual-hosted git repository. ivandasch pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 6229b46c277 IGNITE-18935 Fix late stopping of TTL workers during deactivation leads to corrupted PDS (#10570) 6229b46c277 is described below commit 6229b46c277707743da4f5fce881450a8a6ca151 Author: Ivan Daschinskiy <ivanda...@apache.org> AuthorDate: Tue Mar 7 11:03:29 2023 +0300 IGNITE-18935 Fix late stopping of TTL workers during deactivation leads to corrupted PDS (#10570) --- modules/compress/pom.xml | 7 + .../processors/cache/GridCacheProcessor.java | 58 ++++-- ...IgnitePdsWithTtlExpirationOnDeactivateTest.java | 222 +++++++++++++++++++++ .../ignite/testsuites/IgnitePdsTestSuite.java | 2 + 4 files changed, 268 insertions(+), 21 deletions(-) diff --git a/modules/compress/pom.xml b/modules/compress/pom.xml index f9c4db51f42..c07bb0b7f8a 100644 --- a/modules/compress/pom.xml +++ b/modules/compress/pom.xml @@ -131,6 +131,13 @@ <version>${guava.version}</version> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>${commons.lang3.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 53b29937c6f..6cb8c84b5b0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -2744,9 +2744,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { // Wait until all evictions are finished. grpsToStop.forEach(t -> sharedCtx.evict().onCacheGroupStopped(t.get1())); - if (!exchActions.cacheStopRequests().isEmpty()) - removeOffheapListenerAfterCheckpoint(grpsToStop); - Map<Integer, List<ExchangeActions.CacheActionData>> cachesToStop = exchActions.cacheStopRequests().stream() .collect(groupingBy(action -> action.descriptor().groupId())); @@ -2763,31 +2760,50 @@ public class GridCacheProcessor extends GridProcessorAdapter { CacheGroupContext gctx = cacheGrps.get(groupId); - if (gctx != null) - gctx.preloader().pause(); + if (gctx != null) { + final String msg = "Failed to wait for topology update, cache group is stopping."; - try { - if (gctx != null) { - final String msg = "Failed to wait for topology update, cache group is stopping."; + // If snapshot operation in progress we must throw CacheStoppedException + // for correct cache proxy restart. For more details see + // IgniteCacheProxy.cacheException() + gctx.affinity().cancelFutures(new CacheStoppedException(msg)); + } - // If snapshot operation in progress we must throw CacheStoppedException - // for correct cache proxy restart. For more details see - // IgniteCacheProxy.cacheException() - gctx.affinity().cancelFutures(new CacheStoppedException(msg)); - } + for (ExchangeActions.CacheActionData action : cachesToStopByGrp.getValue()) { + context().tm().rollbackTransactionsForStoppingCache(action.descriptor().cacheId()); - for (ExchangeActions.CacheActionData action : cachesToStopByGrp.getValue()) { - context().tm().rollbackTransactionsForStoppingCache(action.descriptor().cacheId()); + stopGateway(action.request()); - stopGateway(action.request()); + String cacheName = action.request().cacheName(); - String cacheName = action.request().cacheName(); + GridCacheAdapter<?, ?> cache = caches.get(cacheName); - // TTL manager has to be unregistered before the checkpointReadLock is acquired. - GridCacheAdapter<?, ?> cache = caches.get(cacheName); + if (cache != null) + cache.context().ttl().unregister(); + } - if (cache != null) - cache.context().ttl().unregister(); + return null; + } + ); + + if (!exchActions.cacheStopRequests().isEmpty()) + removeOffheapListenerAfterCheckpoint(grpsToStop); + + doInParallel( + parallelismLvl, + sharedCtx.kernalContext().pools().getSystemExecutorService(), + cachesToStop.entrySet(), + cachesToStopByGrp -> { + Integer groupId = cachesToStopByGrp.getKey(); + + CacheGroupContext gctx = cacheGrps.get(groupId); + + if (gctx != null) + gctx.preloader().pause(); + + try { + for (ExchangeActions.CacheActionData action : cachesToStopByGrp.getValue()) { + String cacheName = action.request().cacheName(); sharedCtx.database().checkpointReadLock(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlExpirationOnDeactivateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlExpirationOnDeactivateTest.java new file mode 100644 index 00000000000..26a82def544 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlExpirationOnDeactivateTest.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.db; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import javax.cache.expiry.AccessedExpiryPolicy; +import javax.cache.expiry.Duration; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.failure.FailureContext; +import org.apache.ignite.failure.FailureHandler; +import org.apache.ignite.failure.NoOpFailureHandler; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.MvccFeatureChecker; +import org.apache.ignite.testframework.junits.WithSystemProperty; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cluster.ClusterState.INACTIVE; +import static org.apache.ignite.testframework.GridTestUtils.runMultiThreadedAsync; + +/** + * Tests if TTL worker is correctly stopped on deactivation and PDS is not corrupted after restart. + */ +@WithSystemProperty(key = IgniteSystemProperties.IGNITE_UNWIND_THROTTLING_TIMEOUT, value = "5") +public class IgnitePdsWithTtlExpirationOnDeactivateTest extends GridCommonAbstractTest { + /** */ + private static final String CACHE_NAME_ATOMIC = "expirable-cache-atomic"; + + /** */ + private static final int EXPIRATION_TIMEOUT = 5_000; + + /** */ + private static final String PAYLOAD = RandomStringUtils.randomAlphanumeric(10000); + + /** */ + private static final int WORKLOAD_THREADS_CNT = Runtime.getRuntime().availableProcessors(); + + /** Failure handler triggered flag. */ + private volatile boolean failureHndTriggered; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.EXPIRATION); + + super.beforeTest(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + final IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + DataRegionConfiguration dfltRegion = new DataRegionConfiguration() + .setMaxSize(512 * 1024 * 1024) + .setCheckpointPageBufferSize(64 * 1024 * 1024) + .setPersistenceEnabled(true); + + // Setting MaxWalArchiveSize to a relatively small value leads to frequent checkpoints (too many WAL segments). + cfg.setDataStorageConfiguration( + new DataStorageConfiguration() + .setWalSegmentSize(8 * 1024 * 1024) + .setMaxWalArchiveSize(16 * 1024 * 1024) + .setCheckpointFrequency(10_000) + .setDefaultDataRegionConfiguration(dfltRegion) + .setWalMode(WALMode.LOG_ONLY)); + + cfg.setCacheConfiguration(getCacheConfiguration(CACHE_NAME_ATOMIC)); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected FailureHandler getFailureHandler(String igniteInstanceName) { + return new NoOpFailureHandler() { + @Override protected boolean handle(Ignite ignite, FailureContext failureCtx) { + failureHndTriggered = true; + + return super.handle(ignite, failureCtx); + } + }; + } + + /** + * Returns a new cache configuration with the given name and {@code GROUP_NAME} group. + * + * @param name Cache name. + * @return Cache configuration. + */ + private CacheConfiguration<?, ?> getCacheConfiguration(String name) { + CacheConfiguration<?, ?> ccfg = new CacheConfiguration<>(); + + ccfg.setName(name); + ccfg.setExpiryPolicyFactory(AccessedExpiryPolicy.factoryOf(new Duration(TimeUnit.MILLISECONDS, EXPIRATION_TIMEOUT))); + ccfg.setEagerTtl(true); + + ccfg.setAtomicityMode(ATOMIC); + + return ccfg; + } + + /** */ + @Test + public void testStartAfterDeactivateWithTtlExpiring() throws Exception { + IgniteEx srv = startGrid(0); + + srv.cluster().state(ClusterState.ACTIVE); + + IgniteCache<Integer, String> cache = srv.cache(CACHE_NAME_ATOMIC); + + AtomicBoolean timeoutReached = new AtomicBoolean(false); + + AtomicInteger threadId = new AtomicInteger(0); + + IgniteInternalFuture<?> ldrFut = runMultiThreadedAsync(() -> { + int id = threadId.getAndIncrement(); + + int i = 0; + while (!timeoutReached.get()) { + cache.put(id * 1_000_000 + i, PAYLOAD); + i++; + } + }, WORKLOAD_THREADS_CNT, "loader"); + + doSleep(EXPIRATION_TIMEOUT); + timeoutReached.set(true); + ldrFut.get(); + + // Add listener on "cache stop" event, that slow down a little been sys pool workers. + addCheckpointListener(srv, new CheckpointListener() { + @Override public void onMarkCheckpointBegin(Context ctx) { + // No-op. + } + + @Override public void onCheckpointBegin(Context ctx) { + // No-op. + } + + @Override public void beforeCheckpointBegin(Context ctx) { + // No-op. + } + + @Override public void afterCheckpointEnd(Context ctx) { + if ("caches stop".equals(ctx.progress().reason())) { + ExecutorService sysPool = srv.context().pools().getSystemExecutorService(); + try { + sysPool.invokeAll(IntStream.range(0, WORKLOAD_THREADS_CNT).mapToObj(i -> new Callable<Void>() { + @Override public Void call() { + doSleep(EXPIRATION_TIMEOUT); + return null; + } + }).collect(Collectors.toList())); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + }); + + // Deactivate and restart. + srv.cluster().state(INACTIVE); + stopGrid(0); + startGrid(0); + + GridTestUtils.waitForCondition(() -> failureHndTriggered, EXPIRATION_TIMEOUT); + + assertFalse(failureHndTriggered); + } + + /** */ + private void addCheckpointListener(IgniteEx grid, CheckpointListener lsnr) { + GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)grid.context().cache().context() + .database(); + + dbMgr.addCheckpointListener(lsnr); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java index 83321a4d901..9bc0f91915a 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java @@ -35,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsSingleNo import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsSporadicDataRecordsOnBackupTest; import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsCacheRestoreTest; import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsDataRegionMetricsTest; +import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsWithTtlExpirationOnDeactivateTest; import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsWithTtlTest; import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsWithTtlTest2; import org.apache.ignite.internal.processors.cache.persistence.db.file.DefaultPageSizeBackwardsCompatibilityTest; @@ -116,6 +117,7 @@ public class IgnitePdsTestSuite { GridTestUtils.addTestIfNeeded(suite, IgniteDbPutGetWithCacheStoreTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, IgnitePdsWithTtlTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, IgnitePdsWithTtlTest2.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, IgnitePdsWithTtlExpirationOnDeactivateTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, IgnitePdsSporadicDataRecordsOnBackupTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, IgniteClusterActivateDeactivateTestWithPersistence.class, ignoredTests);