This is an automated email from the ASF dual-hosted git repository.

ivandasch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 6229b46c277 IGNITE-18935 Fix late stopping of TTL workers during 
deactivation leads to corrupted PDS (#10570)
6229b46c277 is described below

commit 6229b46c277707743da4f5fce881450a8a6ca151
Author: Ivan Daschinskiy <ivanda...@apache.org>
AuthorDate: Tue Mar 7 11:03:29 2023 +0300

    IGNITE-18935 Fix late stopping of TTL workers during deactivation leads to 
corrupted PDS (#10570)
---
 modules/compress/pom.xml                           |   7 +
 .../processors/cache/GridCacheProcessor.java       |  58 ++++--
 ...IgnitePdsWithTtlExpirationOnDeactivateTest.java | 222 +++++++++++++++++++++
 .../ignite/testsuites/IgnitePdsTestSuite.java      |   2 +
 4 files changed, 268 insertions(+), 21 deletions(-)

diff --git a/modules/compress/pom.xml b/modules/compress/pom.xml
index f9c4db51f42..c07bb0b7f8a 100644
--- a/modules/compress/pom.xml
+++ b/modules/compress/pom.xml
@@ -131,6 +131,13 @@
             <version>${guava.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>${commons.lang3.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 53b29937c6f..6cb8c84b5b0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2744,9 +2744,6 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
         // Wait until all evictions are finished.
         grpsToStop.forEach(t -> 
sharedCtx.evict().onCacheGroupStopped(t.get1()));
 
-        if (!exchActions.cacheStopRequests().isEmpty())
-            removeOffheapListenerAfterCheckpoint(grpsToStop);
-
         Map<Integer, List<ExchangeActions.CacheActionData>> cachesToStop = 
exchActions.cacheStopRequests().stream()
             .collect(groupingBy(action -> action.descriptor().groupId()));
 
@@ -2763,31 +2760,50 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
 
                     CacheGroupContext gctx = cacheGrps.get(groupId);
 
-                    if (gctx != null)
-                        gctx.preloader().pause();
+                    if (gctx != null) {
+                        final String msg = "Failed to wait for topology 
update, cache group is stopping.";
 
-                    try {
-                        if (gctx != null) {
-                            final String msg = "Failed to wait for topology 
update, cache group is stopping.";
+                        // If snapshot operation in progress we must throw 
CacheStoppedException
+                        // for correct cache proxy restart. For more details 
see
+                        // IgniteCacheProxy.cacheException()
+                        gctx.affinity().cancelFutures(new 
CacheStoppedException(msg));
+                    }
 
-                            // If snapshot operation in progress we must throw 
CacheStoppedException
-                            // for correct cache proxy restart. For more 
details see
-                            // IgniteCacheProxy.cacheException()
-                            gctx.affinity().cancelFutures(new 
CacheStoppedException(msg));
-                        }
+                    for (ExchangeActions.CacheActionData action : 
cachesToStopByGrp.getValue()) {
+                        
context().tm().rollbackTransactionsForStoppingCache(action.descriptor().cacheId());
 
-                        for (ExchangeActions.CacheActionData action : 
cachesToStopByGrp.getValue()) {
-                            
context().tm().rollbackTransactionsForStoppingCache(action.descriptor().cacheId());
+                        stopGateway(action.request());
 
-                            stopGateway(action.request());
+                        String cacheName = action.request().cacheName();
 
-                            String cacheName = action.request().cacheName();
+                        GridCacheAdapter<?, ?> cache = caches.get(cacheName);
 
-                            // TTL manager has to be unregistered before the 
checkpointReadLock is acquired.
-                            GridCacheAdapter<?, ?> cache = 
caches.get(cacheName);
+                        if (cache != null)
+                            cache.context().ttl().unregister();
+                    }
 
-                            if (cache != null)
-                                cache.context().ttl().unregister();
+                    return null;
+                }
+            );
+
+            if (!exchActions.cacheStopRequests().isEmpty())
+                removeOffheapListenerAfterCheckpoint(grpsToStop);
+
+            doInParallel(
+                parallelismLvl,
+                sharedCtx.kernalContext().pools().getSystemExecutorService(),
+                cachesToStop.entrySet(),
+                cachesToStopByGrp -> {
+                    Integer groupId = cachesToStopByGrp.getKey();
+
+                    CacheGroupContext gctx = cacheGrps.get(groupId);
+
+                    if (gctx != null)
+                        gctx.preloader().pause();
+
+                    try {
+                        for (ExchangeActions.CacheActionData action : 
cachesToStopByGrp.getValue()) {
+                            String cacheName = action.request().cacheName();
 
                             sharedCtx.database().checkpointReadLock();
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlExpirationOnDeactivateTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlExpirationOnDeactivateTest.java
new file mode 100644
index 00000000000..26a82def544
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlExpirationOnDeactivateTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import javax.cache.expiry.AccessedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureHandler;
+import org.apache.ignite.failure.NoOpFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.MvccFeatureChecker;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cluster.ClusterState.INACTIVE;
+import static 
org.apache.ignite.testframework.GridTestUtils.runMultiThreadedAsync;
+
+/**
+ * Tests if TTL worker is correctly stopped on deactivation and PDS is not 
corrupted after restart.
+ */
+@WithSystemProperty(key = 
IgniteSystemProperties.IGNITE_UNWIND_THROTTLING_TIMEOUT, value = "5")
+public class IgnitePdsWithTtlExpirationOnDeactivateTest extends 
GridCommonAbstractTest {
+    /** */
+    private static final String CACHE_NAME_ATOMIC = "expirable-cache-atomic";
+
+    /** */
+    private static final int EXPIRATION_TIMEOUT = 5_000;
+
+    /** */
+    private static final String PAYLOAD = 
RandomStringUtils.randomAlphanumeric(10000);
+
+    /** */
+    private static final int WORKLOAD_THREADS_CNT = 
Runtime.getRuntime().availableProcessors();
+
+    /** Failure handler triggered flag. */
+    private volatile boolean failureHndTriggered;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        
MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.EXPIRATION);
+
+        super.beforeTest();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        final IgniteConfiguration cfg = 
super.getConfiguration(igniteInstanceName);
+
+        DataRegionConfiguration dfltRegion = new DataRegionConfiguration()
+                .setMaxSize(512 * 1024 * 1024)
+                .setCheckpointPageBufferSize(64 * 1024 * 1024)
+                .setPersistenceEnabled(true);
+
+        // Setting MaxWalArchiveSize to a relatively small value leads to 
frequent checkpoints (too many WAL segments).
+        cfg.setDataStorageConfiguration(
+            new DataStorageConfiguration()
+                    .setWalSegmentSize(8 * 1024 * 1024)
+                    .setMaxWalArchiveSize(16 * 1024 * 1024)
+                    .setCheckpointFrequency(10_000)
+                    .setDefaultDataRegionConfiguration(dfltRegion)
+                    .setWalMode(WALMode.LOG_ONLY));
+
+        cfg.setCacheConfiguration(getCacheConfiguration(CACHE_NAME_ATOMIC));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected FailureHandler getFailureHandler(String 
igniteInstanceName) {
+        return new NoOpFailureHandler() {
+            @Override protected boolean handle(Ignite ignite, FailureContext 
failureCtx) {
+                failureHndTriggered = true;
+
+                return super.handle(ignite, failureCtx);
+            }
+        };
+    }
+
+    /**
+     * Returns a new cache configuration with the given name and {@code 
GROUP_NAME} group.
+     *
+     * @param name Cache name.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<?, ?> getCacheConfiguration(String name) {
+        CacheConfiguration<?, ?> ccfg = new CacheConfiguration<>();
+
+        ccfg.setName(name);
+        ccfg.setExpiryPolicyFactory(AccessedExpiryPolicy.factoryOf(new 
Duration(TimeUnit.MILLISECONDS, EXPIRATION_TIMEOUT)));
+        ccfg.setEagerTtl(true);
+
+        ccfg.setAtomicityMode(ATOMIC);
+
+        return ccfg;
+    }
+
+    /** */
+    @Test
+    public void testStartAfterDeactivateWithTtlExpiring() throws Exception {
+        IgniteEx srv = startGrid(0);
+
+        srv.cluster().state(ClusterState.ACTIVE);
+
+        IgniteCache<Integer, String> cache = srv.cache(CACHE_NAME_ATOMIC);
+
+        AtomicBoolean timeoutReached = new AtomicBoolean(false);
+
+        AtomicInteger threadId = new AtomicInteger(0);
+
+        IgniteInternalFuture<?> ldrFut = runMultiThreadedAsync(() -> {
+            int id = threadId.getAndIncrement();
+
+            int i = 0;
+            while (!timeoutReached.get()) {
+                cache.put(id * 1_000_000 + i, PAYLOAD);
+                i++;
+            }
+        }, WORKLOAD_THREADS_CNT, "loader");
+
+        doSleep(EXPIRATION_TIMEOUT);
+        timeoutReached.set(true);
+        ldrFut.get();
+
+        // Add listener on "cache stop" event, that slow down a little been 
sys pool workers.
+        addCheckpointListener(srv, new CheckpointListener() {
+            @Override public void onMarkCheckpointBegin(Context ctx) {
+                // No-op.
+            }
+
+            @Override public void onCheckpointBegin(Context ctx) {
+                // No-op.
+            }
+
+            @Override public void beforeCheckpointBegin(Context ctx) {
+                // No-op.
+            }
+
+            @Override public void afterCheckpointEnd(Context ctx) {
+                if ("caches stop".equals(ctx.progress().reason())) {
+                    ExecutorService sysPool = 
srv.context().pools().getSystemExecutorService();
+                    try {
+                        sysPool.invokeAll(IntStream.range(0, 
WORKLOAD_THREADS_CNT).mapToObj(i -> new Callable<Void>() {
+                            @Override public Void call() {
+                                doSleep(EXPIRATION_TIMEOUT);
+                                return null;
+                            }
+                        }).collect(Collectors.toList()));
+                    }
+                    catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                    }
+                }
+            }
+        });
+
+        // Deactivate and restart.
+        srv.cluster().state(INACTIVE);
+        stopGrid(0);
+        startGrid(0);
+
+        GridTestUtils.waitForCondition(() -> failureHndTriggered, 
EXPIRATION_TIMEOUT);
+
+        assertFalse(failureHndTriggered);
+    }
+
+    /** */
+    private void addCheckpointListener(IgniteEx grid, CheckpointListener lsnr) 
{
+        GridCacheDatabaseSharedManager dbMgr = 
(GridCacheDatabaseSharedManager)grid.context().cache().context()
+                .database();
+
+        dbMgr.addCheckpointListener(lsnr);
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
index 83321a4d901..9bc0f91915a 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
@@ -35,6 +35,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsSingleNo
 import 
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsSporadicDataRecordsOnBackupTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsCacheRestoreTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsDataRegionMetricsTest;
+import 
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsWithTtlExpirationOnDeactivateTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsWithTtlTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsWithTtlTest2;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.file.DefaultPageSizeBackwardsCompatibilityTest;
@@ -116,6 +117,7 @@ public class IgnitePdsTestSuite {
         GridTestUtils.addTestIfNeeded(suite, 
IgniteDbPutGetWithCacheStoreTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, IgnitePdsWithTtlTest.class, 
ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, IgnitePdsWithTtlTest2.class, 
ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, 
IgnitePdsWithTtlExpirationOnDeactivateTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
IgnitePdsSporadicDataRecordsOnBackupTest.class, ignoredTests);
 
         GridTestUtils.addTestIfNeeded(suite, 
IgniteClusterActivateDeactivateTestWithPersistence.class, ignoredTests);

Reply via email to