This is an automated email from the ASF dual-hosted git repository. irakov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 9419272 IGNITE-11617 New exchange coordinator skips client fast reply for previous exchange - Fixes #6332. 9419272 is described below commit 9419272a052db54081699d82aa9197496de8feb5 Author: Ivan Rakov <ira...@apache.org> AuthorDate: Mon Mar 25 17:21:18 2019 +0300 IGNITE-11617 New exchange coordinator skips client fast reply for previous exchange - Fixes #6332. --- .../preloader/GridDhtPartitionsExchangeFuture.java | 4 + .../ClientFastReplyCoordinatorFailureTest.java | 141 +++++++++++++++++++++ .../ignite/testsuites/IgniteCacheTestSuite6.java | 2 + 3 files changed, 147 insertions(+) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index d6e0bf8..2aa28cf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -866,6 +866,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte case NONE: { initTopologies(); + synchronized (mux) { + state = ExchangeLocalState.DONE; + } + onDone(topVer); break; diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientFastReplyCoordinatorFailureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientFastReplyCoordinatorFailureTest.java new file mode 100644 index 0000000..97958dc --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientFastReplyCoordinatorFailureTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.concurrent.CountDownLatch; +import org.apache.ignite.Ignite; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.failure.StopNodeOrHaltFailureHandler; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.After; +import org.junit.Test; + +/** + * Covers race with client join and instant successive coordinator change. + */ +public class ClientFastReplyCoordinatorFailureTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Least significant bits of old coordinator's node ID. */ + public static final int OLD_CRD_BITS = 0xFFFF; + + /** Latch that will be triggered after blocking message from client to old coordinator. */ + private final CountDownLatch clientSingleMesssageLatch = new CountDownLatch(1); + + /** Latch that will be triggered after blocking message from new server to old coordinator. */ + private final CountDownLatch newSrvSingleMesssageLatch = new CountDownLatch(1); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + if (igniteInstanceName.contains("client")) + cfg.setClientMode(true); + + cfg.setFailureHandler(new StopNodeOrHaltFailureHandler()); + + cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER)); + + TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi(); + + // Block messages to old coordinator right before killing it. + if (igniteInstanceName.contains("client")) { + commSpi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { + @Override public boolean apply(ClusterNode node, Message msg) { + if (msg instanceof GridDhtPartitionsSingleMessage && + (node.id().getLeastSignificantBits() & OLD_CRD_BITS) == 0) { + info("Going to block message [node=" + node + ", msg=" + msg + ']'); + + clientSingleMesssageLatch.countDown(); + + return true; + } + + return false; + } + }); + } + else if (getTestIgniteInstanceName(3).equals(igniteInstanceName)) { + commSpi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { + @Override public boolean apply(ClusterNode node, Message msg) { + if (msg instanceof GridDhtPartitionsSingleMessage && + (node.id().getLeastSignificantBits() & OLD_CRD_BITS) == 0L) { + info("Going to block message [node=" + node + ", msg=" + msg + ']'); + + newSrvSingleMesssageLatch.countDown(); + + return true; + } + + return false; + } + }); + } + + cfg.setCommunicationSpi(commSpi); + + return cfg; + } + + /** + * Cleanup after test. + */ + @After + public void cleanUp() { + stopAllGrids(); + } + + /** + * Checks that new coordinator will respond to client single partitions message. + * + * @throws Exception if failed. + */ + @Test + public void testClientFastReply() throws Exception { + startGrids(3); + + awaitPartitionMapExchange(); + + // Client join will be hanging on local join exchange. + IgniteInternalFuture<Ignite> startFut = GridTestUtils.runAsync(() -> startGrid("client-1")); + + clientSingleMesssageLatch.await(); + + // Server start will be blocked. + IgniteInternalFuture<IgniteEx> srvStartFut = GridTestUtils.runAsync(() -> startGrid(3)); + + newSrvSingleMesssageLatch.await(); + + stopGrid(0); + + srvStartFut.get(); + + startFut.get(); + } +} \ No newline at end of file diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java index 444152d..6b1d254 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import org.apache.ignite.internal.processors.cache.CacheNoAffinityExchangeTest; +import org.apache.ignite.internal.processors.cache.ClientFastReplyCoordinatorFailureTest; import org.apache.ignite.internal.processors.cache.PartitionedAtomicCacheGetsDistributionTest; import org.apache.ignite.internal.processors.cache.PartitionedTransactionalOptimisticCacheGetsDistributionTest; import org.apache.ignite.internal.processors.cache.PartitionedTransactionalPessimisticCacheGetsDistributionTest; @@ -81,6 +82,7 @@ public class IgniteCacheTestSuite6 { GridTestUtils.addTestIfNeeded(suite, CacheExchangeMergeTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, ExchangeMergeStaleServerNodesTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, ClientFastReplyCoordinatorFailureTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, TxRollbackOnTimeoutTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, TxRollbackOnTimeoutNoDeadlockDetectionTest.class, ignoredTests);