This is an automated email from the ASF dual-hosted git repository. av pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 0e6b97e IGNITE-12470 Pme-free switch feature should be deactivatable (#7304) 0e6b97e is described below commit 0e6b97edc1d6a1faba5efa10d69e6eb69c78ae24 Author: Anton Vinogradov <a...@apache.org> AuthorDate: Mon Jan 27 10:07:57 2020 +0300 IGNITE-12470 Pme-free switch feature should be deactivatable (#7304) Signed-off-by: Anton Vinogradov <a...@apache.org> --- .../org/apache/ignite/IgniteSystemProperties.java | 3 + .../org/apache/ignite/internal/IgniteFeatures.java | 5 + .../internal/processors/cache/ExchangeContext.java | 17 +- .../preloader/GridDhtPartitionsExchangeFuture.java | 2 +- .../distributed/GridExchangeFreeSwitchTest.java | 208 +++++++++++++++------ 5 files changed, 180 insertions(+), 55 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index b474eb9..22b998e 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -207,6 +207,9 @@ public final class IgniteSystemProperties { /** */ public static final String IGNITE_EXCHANGE_MERGE_DELAY = "IGNITE_EXCHANGE_MERGE_DELAY"; + /** PME-free switch explicitly disabled. */ + public static final String IGNITE_PME_FREE_SWITCH_DISABLED = "IGNITE_PME_FREE_SWITCH_DISABLED"; + /** * Name of the system property defining name of command line program. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java index 956d72f..fa6bd0a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java @@ -23,6 +23,8 @@ import org.apache.ignite.internal.managers.encryption.GridEncryptionManager; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_PME_FREE_SWITCH_DISABLED; +import static org.apache.ignite.IgniteSystemProperties.getBoolean; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_FEATURES; /** @@ -166,6 +168,9 @@ public enum IgniteFeatures { final BitSet set = new BitSet(); for (IgniteFeatures value : IgniteFeatures.values()) { + if (value == PME_FREE_SWITCH && getBoolean(IGNITE_PME_FREE_SWITCH_DISABLED)) + continue; + final int featureId = value.getFeatureId(); assert !set.get(featureId) : "Duplicate feature ID found for [" + value + "] having same ID [" diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java index e101f1a..f9e9376 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache; import java.util.HashSet; import java.util.Set; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; @@ -38,6 +39,9 @@ public class ExchangeContext { /** */ public static final String IGNITE_EXCHANGE_COMPATIBILITY_VER_1 = "IGNITE_EXCHANGE_COMPATIBILITY_VER_1"; + /** Logger. */ + private final IgniteLogger log; + /** Cache groups to request affinity for during local join exchange. */ private Set<Integer> requestGrpsAffOnJoin; @@ -57,16 +61,25 @@ public class ExchangeContext { private final boolean compatibilityNode = getBoolean(IGNITE_EXCHANGE_COMPATIBILITY_VER_1, false); /** + * @param cctx Context. * @param crd Coordinator flag. * @param fut Exchange future. */ - public ExchangeContext(boolean crd, GridDhtPartitionsExchangeFuture fut) { + public ExchangeContext(GridCacheSharedContext<?, ?> cctx, boolean crd, GridDhtPartitionsExchangeFuture fut) { + log = cctx.logger(getClass()); + int protocolVer = exchangeProtocolVersion(fut.firstEventCache().minimumNodeVersion()); + boolean allNodesSupportsPmeFreeSwitch = allNodesSupports(fut.firstEventCache().allNodes(), PME_FREE_SWITCH); + + if (!allNodesSupportsPmeFreeSwitch) + log.warning("Current topology does not support the PME-free switch. Please check all nodes support" + + " this feature and it was not explicitly disabled by IGNITE_PME_FREE_SWITCH_DISABLED JVM option."); + if (!compatibilityNode && fut.wasRebalanced() && fut.isBaselineNodeFailed() && - allNodesSupports(fut.firstEventCache().allNodes(), PME_FREE_SWITCH)) { + allNodesSupportsPmeFreeSwitch) { exchangeFreeSwitch = true; merge = false; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 890e28b..40055f6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -771,7 +771,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte boolean crdNode = crd != null && crd.isLocal(); - exchCtx = new ExchangeContext(crdNode, this); + exchCtx = new ExchangeContext(cctx, crdNode, this); cctx.exchange().exchangerBlockingSectionBegin(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeSwitchTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeSwitchTest.java index 7ca5fff..cbc2a49 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeSwitchTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeSwitchTest.java @@ -31,6 +31,7 @@ import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.cache.affinity.AffinityFunctionContext; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.cluster.ClusterState; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; @@ -47,7 +48,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.typedef.G; -import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -55,7 +56,11 @@ import org.apache.ignite.transactions.Transaction; import org.jetbrains.annotations.Nullable; import org.junit.Test; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_PME_FREE_SWITCH_DISABLED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.internal.IgniteFeatures.PME_FREE_SWITCH; +import static org.apache.ignite.internal.IgniteFeatures.allNodesSupports; +import static org.apache.ignite.internal.IgniteFeatures.nodeSupports; /** * @@ -81,16 +86,13 @@ public class GridExchangeFreeSwitchTest extends GridCommonAbstractTest { cfg.setCacheConfiguration(cacheC != null ? cacheC.apply(igniteInstanceName) : new CacheConfiguration[] {cacheConfiguration()}); + cfg.setClusterStateOnStart(ClusterState.INACTIVE); + DataStorageConfiguration dsCfg = new DataStorageConfiguration(); DataRegionConfiguration drCfg = new DataRegionConfiguration(); - if (persistence) { - drCfg.setPersistenceEnabled(true); - - cfg.setActiveOnStart(false); - cfg.setAutoActivationEnabled(false); - } + drCfg.setPersistenceEnabled(persistence); dsCfg.setDefaultDataRegionConfiguration(drCfg); @@ -135,24 +137,54 @@ public class GridExchangeFreeSwitchTest extends GridCommonAbstractTest { } /** - * Checks Partition Exchange happen in case of baseline auto-adjust (in-memory cluster). It's not possible to - * perform switch since primaries may change. + * Checks PME happen in case of baseline auto-adjust (in-memory cluster). It's not possible to perform switch since + * primaries may change. */ @Test public void testNonBaselineNodeLeftOnFullyRebalancedCluster() throws Exception { - testNodeLeftOnFullyRebalancedCluster(); + testNodeLeftOnFullyRebalancedCluster(PmeFreeSwitchDisabledNode.NONE); } /** - * Checks Partition Exchange is absent in case of fixed baseline. It's possible to perform switch since primaries - * can't change. + * Checks PME is absent in case of fixed baseline. It's possible to perform switch since primaries can't change. */ @Test public void testBaselineNodeLeftOnFullyRebalancedCluster() throws Exception { + testBaselineNodeLeftOnFullyRebalancedCluster(PmeFreeSwitchDisabledNode.NONE); + } + + /** + * Checks PME is absent/present with all nodes except first one supports PME-free switch. + */ + @Test + public void testBaselineNodeLeftOnFullyRebalancedClusterPmeFreeDisabledFirstNode() throws Exception { + testBaselineNodeLeftOnFullyRebalancedCluster(PmeFreeSwitchDisabledNode.FIRST); + } + + /** + * Checks PME is absent/present with all nodes except midlle one supports PME-free switch. + */ + @Test + public void testBaselineNodeLeftOnFullyRebalancedClusterPmeFreeDisabledMiddleNode() throws Exception { + testBaselineNodeLeftOnFullyRebalancedCluster(PmeFreeSwitchDisabledNode.MIDDLE); + } + + /** + * Checks PME is absent/present with all nodes except last one supports PME-free switch. + */ + @Test + public void testBaselineNodeLeftOnFullyRebalancedClusterPmeFreeDisabledLastNode() throws Exception { + testBaselineNodeLeftOnFullyRebalancedCluster(PmeFreeSwitchDisabledNode.LAST); + } + + /** + * Checks PME is absent/present in case of persistence enabled. + */ + private void testBaselineNodeLeftOnFullyRebalancedCluster(PmeFreeSwitchDisabledNode order) throws Exception { persistence = true; try { - testNodeLeftOnFullyRebalancedCluster(); + testNodeLeftOnFullyRebalancedCluster(order); } finally { persistence = false; @@ -160,36 +192,75 @@ public class GridExchangeFreeSwitchTest extends GridCommonAbstractTest { } /** + * Starts node with PME-free feature explicitly disabled. + */ + private void startNodeWithPmeFreeSwitchDisabled() throws Exception { + try { + System.setProperty(IGNITE_PME_FREE_SWITCH_DISABLED, "true"); + + Ignite ignite = startGrid(G.allGrids().size()); + + assertFalse(nodeSupports(ignite.cluster().localNode(), PME_FREE_SWITCH)); + } + finally { + System.clearProperty(IGNITE_PME_FREE_SWITCH_DISABLED); + } + } + + /** * Checks node left PME absent/present on fully rebalanced topology (Latest PME == LAA). */ - private void testNodeLeftOnFullyRebalancedCluster() throws Exception { + private void testNodeLeftOnFullyRebalancedCluster(PmeFreeSwitchDisabledNode disabled) throws Exception { int nodes = 10; - Ignite ignite = startGridsMultiThreaded(nodes, true); + switch (disabled) { + case FIRST: + startNodeWithPmeFreeSwitchDisabled(); - ignite.cluster().active(true); + startGridsMultiThreaded(1, nodes - 1); - AtomicLong cnt = new AtomicLong(); + break; - for (int i = 0; i < nodes; i++) { - TestRecordingCommunicationSpi spi = - (TestRecordingCommunicationSpi)ignite(i).configuration().getCommunicationSpi(); + case MIDDLE: + startGridsMultiThreaded(0, (nodes / 2) - 1); - spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { - @Override public boolean apply(ClusterNode node, Message msg) { - if (msg.getClass().equals(GridDhtPartitionsSingleMessage.class) && - ((GridDhtPartitionsAbstractMessage)msg).exchangeId() != null) - cnt.incrementAndGet(); + startNodeWithPmeFreeSwitchDisabled(); - if (!persistence) - return false; + int started = G.allGrids().size(); - return msg.getClass().equals(GridDhtPartitionsSingleMessage.class) || - msg.getClass().equals(GridDhtPartitionsFullMessage.class); - } - }); + startGridsMultiThreaded(started, nodes - started); + + break; + + case LAST: + startGridsMultiThreaded(0, nodes - 1); + + startNodeWithPmeFreeSwitchDisabled(); + + break; + + case NONE: + startGridsMultiThreaded(0, nodes); + + break; + + default: + throw new UnsupportedOperationException(); } + assertEquals(nodes, G.allGrids().size()); + + assertEquals(ClusterState.INACTIVE, grid(0).cluster().state()); + + grid(0).cluster().state(ClusterState.ACTIVE); + + awaitPartitionMapExchange(); + + AtomicLong singleCnt = new AtomicLong(); + AtomicLong fullCnt = new AtomicLong(); + + startPmeMessagesCounting(nodes, singleCnt, fullCnt); + Random r = new Random(); while (nodes > 1) { @@ -197,13 +268,41 @@ public class GridExchangeFreeSwitchTest extends GridCommonAbstractTest { awaitPartitionMapExchange(true, true, null, true); - assertEquals(persistence ? 0 /*PME absent*/ : (nodes - 1) /*regular PME*/, cnt.get()); - IgniteEx alive = (IgniteEx)G.allGrids().get(0); assertTrue(alive.context().cache().context().exchange().lastFinishedFuture().rebalanced()); - cnt.set(0); + boolean pmeFreeSwitch = persistence && allNodesSupports(alive.cluster().nodes(), PME_FREE_SWITCH); + + assertEquals(pmeFreeSwitch ? 0 : (nodes - 1), singleCnt.get()); + assertEquals(pmeFreeSwitch ? 0 : (nodes - 1), fullCnt.get()); + + singleCnt.set(0); + fullCnt.set(0); + } + } + + /** + * @param nodes Nodes. + * @param signleCnt Counter for GridDhtPartitionsSingleMessage. + * @param fullCnt Counter for GridDhtPartitionsFullMessage. + */ + private void startPmeMessagesCounting(int nodes, AtomicLong signleCnt, AtomicLong fullCnt) { + for (int i = 0; i < nodes; i++) { + TestRecordingCommunicationSpi spi = + (TestRecordingCommunicationSpi)ignite(i).configuration().getCommunicationSpi(); + + spi.closure(new IgniteBiInClosure<ClusterNode, Message>() { + @Override public void apply(ClusterNode node, Message msg) { + if (msg.getClass().equals(GridDhtPartitionsSingleMessage.class) && + ((GridDhtPartitionsAbstractMessage)msg).exchangeId() != null) + signleCnt.incrementAndGet(); + + if (msg.getClass().equals(GridDhtPartitionsFullMessage.class) && + ((GridDhtPartitionsAbstractMessage)msg).exchangeId() != null) + fullCnt.incrementAndGet(); + } + }); } } @@ -258,25 +357,12 @@ public class GridExchangeFreeSwitchTest extends GridCommonAbstractTest { int nodes = 4; - startGridsMultiThreaded(nodes, true).cluster().active(true); - - AtomicLong cnt = new AtomicLong(); + startGridsMultiThreaded(nodes); - for (int i = 0; i < nodes; i++) { - TestRecordingCommunicationSpi spi = - (TestRecordingCommunicationSpi)ignite(i).configuration().getCommunicationSpi(); + AtomicLong singleCnt = new AtomicLong(); + AtomicLong fullCnt = new AtomicLong(); - spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { - @Override public boolean apply(ClusterNode node, Message msg) { - if (msg.getClass().equals(GridDhtPartitionsSingleMessage.class) && - ((GridDhtPartitionsAbstractMessage)msg).exchangeId() != null) - cnt.incrementAndGet(); - - return msg.getClass().equals(GridDhtPartitionsSingleMessage.class) || - msg.getClass().equals(GridDhtPartitionsFullMessage.class); - } - }); - } + startPmeMessagesCounting(nodes, singleCnt, fullCnt); Random r = new Random(); @@ -457,7 +543,8 @@ public class GridExchangeFreeSwitchTest extends GridCommonAbstractTest { assertEquals(nodes - 1, pmeFreeCnt); - assertEquals(0, cnt.get()); + assertEquals(0, singleCnt.get()); + assertEquals(0, fullCnt.get()); } finally { persistence = false; @@ -599,4 +686,21 @@ public class GridExchangeFreeSwitchTest extends GridCommonAbstractTest { return res; } } + + /** + * Specifies node to start with IGNITE_PME_FREE_SWITCH_DISABLED JVM option. + */ + private enum PmeFreeSwitchDisabledNode { + /** First. */ + FIRST, + + /** Middle. */ + MIDDLE, + + /** Last. */ + LAST, + + /** None. */ + NONE + } }