This is an automated email from the ASF dual-hosted git repository. sergeychugunov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 71dbf16 IGNITE-13976 Fixed inconsistency in WAL mode state when changing WAL state on unstable topology - Fixes #8643. 71dbf16 is described below commit 71dbf16cd23af91f25e65824a73d2cc438747b30 Author: Sergey Chugunov <sergey.chugu...@gmail.com> AuthorDate: Mon Apr 12 16:55:20 2021 +0300 IGNITE-13976 Fixed inconsistency in WAL mode state when changing WAL state on unstable topology - Fixes #8643. Signed-off-by: Ilya Kasnacheev <ilya.kasnach...@gmail.com> --- .../internal/processors/cache/WalStateManager.java | 42 +++-- .../wal/WalEnableDisableWithNodeShutdownTest.java | 46 +---- .../wal/WalEnableDisableWithRestartsTest.java | 203 +++++++++++++++++++++ .../ignite/testsuites/IgnitePdsTestSuite.java | 6 +- 4 files changed, 240 insertions(+), 57 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java index a301831..81f1cfd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java @@ -204,27 +204,29 @@ public class WalStateManager extends GridCacheSharedManagerAdapter { synchronized (mux) { // Process top pending requests. for (CacheGroupDescriptor grpDesc : cacheProcessor().cacheGroupDescriptors().values()) { - WalStateProposeMessage msg = grpDesc.nextWalChangeRequest(); + CacheGroupContext cctx = cacheProcessor().cacheGroup(grpDesc.groupId()); - if (msg != null) { - if (log.isDebugEnabled()) - log.debug("Processing WAL state message on start: " + msg); + if (cctx != null) + cctx.globalWalEnabled(grpDesc.walEnabled()); - boolean enabled = grpDesc.walEnabled(); + for (WalStateProposeMessage msg : grpDesc.walChangeRequests()) { + if (msg != null) { + if (log.isDebugEnabled()) + log.debug("Processing WAL state message on start: " + msg); - WalStateResult res; + boolean enabled = grpDesc.walEnabled(); - if (F.eq(enabled, msg.enable())) - res = new WalStateResult(msg, false); - else { - res = new WalStateResult(msg, true); + WalStateResult res; - grpDesc.walEnabled(!enabled); - } + if (F.eq(enabled, msg.enable())) + res = new WalStateResult(msg, false); + else + res = new WalStateResult(msg, true); - initialRess.add(res); + initialRess.add(res); - addResult(res); + addResult(res); + } } } } @@ -241,9 +243,19 @@ public class WalStateManager extends GridCacheSharedManagerAdapter { return; synchronized (mux) { - for (WalStateResult res : initialRess) + for (WalStateResult res : initialRess) { onCompletedLocally(res); + if (res.changed()) { + WalStateProposeMessage propMsg = res.message(); + + CacheGroupContext grpCtx = cctx.cache().cacheGroup(propMsg.groupId()); + + if (grpCtx != null) + grpCtx.globalWalEnabled(propMsg.enable()); + } + } + initialRess.clear(); } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalEnableDisableWithNodeShutdownTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalEnableDisableWithNodeShutdownTest.java index 6cae455..ab90b3f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalEnableDisableWithNodeShutdownTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalEnableDisableWithNodeShutdownTest.java @@ -18,9 +18,7 @@ package org.apache.ignite.internal.processors.cache.persistence.wal; import java.util.LinkedList; import java.util.List; -import javax.cache.configuration.CompleteConfiguration; import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; import org.apache.ignite.Ignition; import org.apache.ignite.cache.CacheAtomicityMode; @@ -52,10 +50,10 @@ public class WalEnableDisableWithNodeShutdownTest extends GridCommonAbstractTest private static final String CACHE_NAME_2 = "MY_CACHE_2"; /** */ - private static final int CYCLES = 2; + private static final int CYCLES = 5; /** */ - public static final int NODES = 4; + public static final int NODES = 3; /** */ public static final int WAIT_MILLIS = 150; @@ -108,18 +106,14 @@ public class WalEnableDisableWithNodeShutdownTest extends GridCommonAbstractTest } catch (IgniteException ex) { if (ex.getMessage().contains("Operation result is unknown because nodes reported different results")) { - log.warning("Expected exception thrown", ex); - - recreateCacheCheckValid(client); + log.error(ex.toString(), ex); - return; + fail("WAL is in inconsistent state"); } else throw ex; } } - - fail("Expected exception not thrown"); } /** @@ -174,18 +168,14 @@ public class WalEnableDisableWithNodeShutdownTest extends GridCommonAbstractTest } catch (IgniteException ex) { if (ex.getMessage().contains("Operation result is unknown because nodes reported different results")) { - log.warning("Expected exception thrown", ex); - - recreateCacheCheckValid(client); + log.error(ex.toString(), ex); - return; + fail("WAL is in inconsistent state"); } else throw ex; } } - - fail("Expected exception not thrown"); } /** @@ -248,30 +238,6 @@ public class WalEnableDisableWithNodeShutdownTest extends GridCommonAbstractTest } /** */ - private void recreateCacheCheckValid(Ignite client) { - IgniteCache c = client.cache(CACHE_NAME); - - CacheConfiguration ccfg = new CacheConfiguration( - (CompleteConfiguration)c.getConfiguration(CacheConfiguration.class)); - - c.destroy(); - - c = client.createCache(ccfg); - - assertTrue(client.cluster().isWalEnabled(CACHE_NAME)); - - c.put(1, "foo"); - - client.cluster().disableWal(CACHE_NAME); - - c.put(2, "bar"); - - client.cluster().enableWal(CACHE_NAME); - - c.put(1, "baz"); - } - - /** */ private Ignite startNodeWithMaintenance(String consistentId) throws Exception { Ignite node; diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalEnableDisableWithRestartsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalEnableDisableWithRestartsTest.java new file mode 100644 index 0000000..d84a05c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalEnableDisableWithRestartsTest.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.cache.persistence.wal; + +import java.util.LinkedList; +import java.util.List; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteException; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.persistence.CleanCacheStoresMaintenanceAction; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.lang.IgniteRunnable; +import org.apache.ignite.maintenance.MaintenanceAction; +import org.apache.ignite.maintenance.MaintenanceRegistry; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.After; +import org.junit.Test; + +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CORRUPTED_DATA_FILES_MNTC_TASK_NAME; + +/** */ +public class WalEnableDisableWithRestartsTest extends GridCommonAbstractTest { + /** */ + private static final String CACHE_NAME = "MY_CACHE"; + + /** */ + private static final String CACHE_NAME_2 = "MY_CACHE_2"; + + /** */ + private static final int CYCLES = 3; + + /** */ + public static final int NODES = 4; + + /** */ + private static volatile boolean shutdown; + + /** */ + private static volatile boolean failure; + + /** */ + @Test + public void test() throws Exception { + failure = false; + shutdown = false; + + LinkedList<Ignite> nodes = new LinkedList<>(); + + for (int i = 0; i < NODES; i++) + nodes.add(Ignition.start(igniteCfg(false, "server_" + i))); + + nodes.getFirst().active(true); + + Ignite client = Ignition.start(igniteCfg(true, "client")); + + new Thread(new Runnable() { + public void run() { + try { + for (int i = 0; i < CYCLES; i++) { + System.err.println("*** CYCLE " + i); + + client.cluster().disableWal(CACHE_NAME); + + Thread.sleep(800); + + client.cluster().enableWal(CACHE_NAME); + + Thread.sleep(800); + } + } + catch (IgniteException ex) { + if (ex.getMessage().contains("Operation result is unknown because nodes reported different results")) { + log.error("TEST FAILED", ex); + + failure = true; + } + } + catch (InterruptedException ex) { + return; + } + catch (Exception ex) { + throw new RuntimeException(ex); + } + finally { + shutdown = true; + } + } + }).start(); + + while (!shutdown) { + Thread.sleep(1_000); + + Ignite ignite = nodes.removeFirst(); + + String consistentId = (String) ignite.cluster().localNode().consistentId(); + + ignite.close(); + + Thread.sleep(1_000); + + nodes.add(startNodeWithMaintenance(consistentId)); + } + + assertFalse(failure); + } + + /** */ + @After + public void cleanup() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** */ + private Ignite startNodeWithMaintenance(String consistentId) throws Exception { + Ignite node; + + try { + node = Ignition.start(igniteCfg(false, consistentId)); + } + catch (Exception ex) { + if (!X.hasCause(ex, "Cache groups with potentially corrupted partition files", IgniteException.class)) + throw ex; + + node = Ignition.start(igniteCfg(false, consistentId)); + + node.compute().run(new IgniteRunnable() { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + @Override public void run() { + MaintenanceRegistry mntcRegistry = ((IgniteEx) ignite).context().maintenanceRegistry(); + + List<MaintenanceAction<?>> actions = mntcRegistry + .actionsForMaintenanceTask(CORRUPTED_DATA_FILES_MNTC_TASK_NAME); + + actions.stream() + .filter(a -> a.name().equals(CleanCacheStoresMaintenanceAction.ACTION_NAME)).findFirst() + .get().execute(); + + mntcRegistry.unregisterMaintenanceTask(CORRUPTED_DATA_FILES_MNTC_TASK_NAME); + } + }); + + node.close(); + + node = Ignition.start(igniteCfg(false, consistentId)); + } + + return node; + } + + /** */ + private IgniteConfiguration igniteCfg(boolean client, String name) throws Exception { + IgniteConfiguration igniteCfg = getConfiguration(name); + + igniteCfg.setConsistentId(name); + + igniteCfg.setClientMode(client); + + CacheConfiguration configuration = new CacheConfiguration(CACHE_NAME); + configuration.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + configuration.setBackups(0); + configuration.setCacheMode(CacheMode.PARTITIONED); + + CacheConfiguration configuration2 = new CacheConfiguration(CACHE_NAME_2); + configuration2.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + configuration2.setBackups(0); + configuration2.setCacheMode(CacheMode.PARTITIONED); + + igniteCfg.setCacheConfiguration(configuration, configuration2); + + igniteCfg.setDataStorageConfiguration(new DataStorageConfiguration().setDefaultDataRegionConfiguration( + new DataRegionConfiguration().setMaxSize(1 * 1024L * 1024 * 1024).setPersistenceEnabled(true))); + + return igniteCfg; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java index 6034074..73b4880 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java @@ -66,6 +66,7 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRing import org.apache.ignite.internal.processors.cache.persistence.wal.SysPropWalDeltaConsistencyTest; import org.apache.ignite.internal.processors.cache.persistence.wal.WalArchiveConsistencyTest; import org.apache.ignite.internal.processors.cache.persistence.wal.WalEnableDisableWithNodeShutdownTest; +import org.apache.ignite.internal.processors.cache.persistence.wal.WalEnableDisableWithRestartsTest; import org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAwareTest; import org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationPersistentTest; import org.apache.ignite.internal.processors.database.IgniteDbDynamicCacheSelfTest; @@ -135,10 +136,11 @@ public class IgnitePdsTestSuite { // Binary meta tests. GridTestUtils.addTestIfNeeded(suite, IgnitePdsCacheObjectBinaryProcessorOnDiscoveryTest.class, ignoredTests); - GridTestUtils.addTestIfNeeded(suite, WalEnableDisableWithNodeShutdownTest.class, ignoredTests); - GridTestUtils.addTestIfNeeded(suite, SegmentAwareTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, WalEnableDisableWithNodeShutdownTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, WalEnableDisableWithRestartsTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, WalArchiveConsistencyTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, RestorePartitionStateTest.class, ignoredTests);