This is an automated email from the ASF dual-hosted git repository.

irakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 9419272  IGNITE-11617 New exchange coordinator skips client fast reply 
for previous exchange - Fixes #6332.
9419272 is described below

commit 9419272a052db54081699d82aa9197496de8feb5
Author: Ivan Rakov <ira...@apache.org>
AuthorDate: Mon Mar 25 17:21:18 2019 +0300

    IGNITE-11617 New exchange coordinator skips client fast reply for previous 
exchange - Fixes #6332.
---
 .../preloader/GridDhtPartitionsExchangeFuture.java |   4 +
 .../ClientFastReplyCoordinatorFailureTest.java     | 141 +++++++++++++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite6.java   |   2 +
 3 files changed, 147 insertions(+)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index d6e0bf8..2aa28cf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -866,6 +866,10 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 case NONE: {
                     initTopologies();
 
+                    synchronized (mux) {
+                        state = ExchangeLocalState.DONE;
+                    }
+
                     onDone(topVer);
 
                     break;
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientFastReplyCoordinatorFailureTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientFastReplyCoordinatorFailureTest.java
new file mode 100644
index 0000000..97958dc
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientFastReplyCoordinatorFailureTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeOrHaltFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Test;
+
+/**
+ * Covers race with client join and instant successive coordinator change.
+ */
+public class ClientFastReplyCoordinatorFailureTest extends 
GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryVmIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** Least significant bits of old coordinator's node ID. */
+    public static final int OLD_CRD_BITS = 0xFFFF;
+
+    /** Latch that will be triggered after blocking message from client to old 
coordinator. */
+    private final CountDownLatch clientSingleMesssageLatch = new 
CountDownLatch(1);
+
+    /** Latch that will be triggered after blocking message from new server to 
old coordinator. */
+    private final CountDownLatch newSrvSingleMesssageLatch = new 
CountDownLatch(1);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        if (igniteInstanceName.contains("client"))
+            cfg.setClientMode(true);
+
+        cfg.setFailureHandler(new StopNodeOrHaltFailureHandler());
+
+        cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
+
+        TestRecordingCommunicationSpi commSpi = new 
TestRecordingCommunicationSpi();
+
+        // Block messages to old coordinator right before killing it.
+        if (igniteInstanceName.contains("client")) {
+            commSpi.blockMessages(new IgniteBiPredicate<ClusterNode, 
Message>() {
+                @Override public boolean apply(ClusterNode node, Message msg) {
+                    if (msg instanceof GridDhtPartitionsSingleMessage &&
+                        (node.id().getLeastSignificantBits() & OLD_CRD_BITS) 
== 0) {
+                        info("Going to block message [node=" + node + ", msg=" 
+ msg + ']');
+
+                        clientSingleMesssageLatch.countDown();
+
+                        return true;
+                    }
+
+                    return false;
+                }
+            });
+        }
+        else if (getTestIgniteInstanceName(3).equals(igniteInstanceName)) {
+            commSpi.blockMessages(new IgniteBiPredicate<ClusterNode, 
Message>() {
+                @Override public boolean apply(ClusterNode node, Message msg) {
+                    if (msg instanceof GridDhtPartitionsSingleMessage &&
+                        (node.id().getLeastSignificantBits() & OLD_CRD_BITS) 
== 0L) {
+                        info("Going to block message [node=" + node + ", msg=" 
+ msg + ']');
+
+                        newSrvSingleMesssageLatch.countDown();
+
+                        return true;
+                    }
+
+                    return false;
+                }
+            });
+        }
+
+        cfg.setCommunicationSpi(commSpi);
+
+        return cfg;
+    }
+
+    /**
+     * Cleanup after test.
+     */
+    @After
+    public void cleanUp() {
+        stopAllGrids();
+    }
+
+    /**
+     * Checks that new coordinator will respond to client single partitions 
message.
+     *
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testClientFastReply() throws Exception {
+        startGrids(3);
+
+        awaitPartitionMapExchange();
+
+        // Client join will be hanging on local join exchange.
+        IgniteInternalFuture<Ignite> startFut = GridTestUtils.runAsync(() -> 
startGrid("client-1"));
+
+        clientSingleMesssageLatch.await();
+
+        // Server start will be blocked.
+        IgniteInternalFuture<IgniteEx> srvStartFut = GridTestUtils.runAsync(() 
-> startGrid(3));
+
+        newSrvSingleMesssageLatch.await();
+
+        stopGrid(0);
+
+        srvStartFut.get();
+
+        startFut.get();
+    }
+}
\ No newline at end of file
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
index 444152d..6b1d254 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import org.apache.ignite.internal.processors.cache.CacheNoAffinityExchangeTest;
+import 
org.apache.ignite.internal.processors.cache.ClientFastReplyCoordinatorFailureTest;
 import 
org.apache.ignite.internal.processors.cache.PartitionedAtomicCacheGetsDistributionTest;
 import 
org.apache.ignite.internal.processors.cache.PartitionedTransactionalOptimisticCacheGetsDistributionTest;
 import 
org.apache.ignite.internal.processors.cache.PartitionedTransactionalPessimisticCacheGetsDistributionTest;
@@ -81,6 +82,7 @@ public class IgniteCacheTestSuite6 {
 
         GridTestUtils.addTestIfNeeded(suite, CacheExchangeMergeTest.class, 
ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
ExchangeMergeStaleServerNodesTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, 
ClientFastReplyCoordinatorFailureTest.class, ignoredTests);
 
         GridTestUtils.addTestIfNeeded(suite, TxRollbackOnTimeoutTest.class, 
ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
TxRollbackOnTimeoutNoDeadlockDetectionTest.class, ignoredTests);

Reply via email to