IGNITE-9501 Exclude newly joined nodes from exchange latch. Fixes #4769.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1573f452
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1573f452
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1573f452

Branch: refs/heads/ignite-gg-14206
Commit: 1573f452b92937d539ac9344766672029c50bacb
Parents: 1858acf
Author: Pavel Kovalenko <jokse...@gmail.com>
Authored: Tue Sep 25 12:58:14 2018 +0300
Committer: Alexey Goncharuk <alexey.goncha...@gmail.com>
Committed: Tue Sep 25 12:58:14 2018 +0300

----------------------------------------------------------------------
 .../GridDhtPartitionsExchangeFuture.java        | 87 ++++++++++-------
 .../preloader/latch/ExchangeLatchManager.java   | 98 ++++++++++++++++----
 ...ExchangeLatchManagerCoordinatorFailTest.java | 34 +++++--
 3 files changed, 158 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1573f452/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index f3e8fde..677e27d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1225,18 +1225,30 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         // To correctly rebalance when persistence is enabled, it is necessary 
to reserve history within exchange.
         partHistReserved = cctx.database().reserveHistoryForExchange();
 
-        boolean distributed = true;
+        // Skipping wait on local join is available when all cluster nodes 
have the same protocol.
+        boolean skipWaitOnLocalJoin = 
cctx.exchange().latch().canSkipJoiningNodes(initialVersion())
+            && localJoinExchange();
 
-        // Do not perform distributed partition release in case of cluster 
activation.
-        if (activateCluster())
-            distributed = false;
+        // Skip partition release if node has locally joined (it doesn't have 
any updates to be finished).
+        if (!skipWaitOnLocalJoin) {
+            boolean distributed = true;
 
-        // On first phase we wait for finishing all local tx updates, atomic 
updates and lock releases on all nodes.
-        waitPartitionRelease(distributed, true);
+            // Do not perform distributed partition release in case of cluster 
activation.
+            if (activateCluster())
+                distributed = false;
 
-        // Second phase is needed to wait for finishing all tx updates from 
primary to backup nodes remaining after first phase.
-        if (distributed)
-            waitPartitionRelease(false, false);
+            // On first phase we wait for finishing all local tx updates, 
atomic updates and lock releases on all nodes.
+            waitPartitionRelease(distributed, true);
+
+            // Second phase is needed to wait for finishing all tx updates 
from primary to backup nodes remaining after first phase.
+            if (distributed)
+                waitPartitionRelease(false, false);
+        }
+        else {
+            if (log.isInfoEnabled())
+                log.info("Skipped waiting for partitions release future (local 
node is joining) " +
+                    "[topVer=" + initialVersion() + "]");
+        }
 
         boolean topChanged = firstDiscoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT 
|| affChangeMsg != null;
 
@@ -1422,9 +1434,11 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             String futInfo = RELEASE_FUTURE_DUMP_THRESHOLD > 0 && waitTime > 
RELEASE_FUTURE_DUMP_THRESHOLD ?
                 partReleaseFut.toString() : "NA";
 
+            String mode = distributed ? "DISTRIBUTED" : "LOCAL";
+
             if (log.isInfoEnabled())
                 log.info("Finished waiting for partition release future 
[topVer=" + exchangeId().topologyVersion() +
-                    ", waitTime=" + (waitEnd - waitStart) + "ms, futInfo=" + 
futInfo + "]");
+                    ", waitTime=" + (waitEnd - waitStart) + "ms, futInfo=" + 
futInfo + ", mode=" + mode + "]");
         }
 
         IgniteInternalFuture<?> locksFut = 
cctx.mvcc().finishLocks(exchId.topologyVersion());
@@ -1465,40 +1479,45 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             }
         }
 
-        if (releaseLatch == null)
+        if (releaseLatch == null) {
+            assert !distributed : "Partitions release latch must be 
initialized in distributed mode.";
+
             return;
+        }
 
         releaseLatch.countDown();
 
-        if (!localJoinExchange()) {
-            try {
-                while (true) {
-                    try {
-                        cctx.exchange().exchangerBlockingSectionBegin();
-
-                        try {
-                            releaseLatch.await(waitTimeout, 
TimeUnit.MILLISECONDS);
-                        }
-                        finally {
-                            cctx.exchange().exchangerBlockingSectionEnd();
-                        }
+        // For compatibility with old version where joining nodes are not 
waiting for latch.
+        if (!cctx.exchange().latch().canSkipJoiningNodes(initialVersion()))
+            return;
 
-                        if (log.isInfoEnabled())
-                            log.info("Finished waiting for partitions release 
latch: " + releaseLatch);
+        try {
+            while (true) {
+                try {
+                    cctx.exchange().exchangerBlockingSectionBegin();
 
-                        break;
+                    try {
+                        releaseLatch.await(waitTimeout, TimeUnit.MILLISECONDS);
                     }
-                    catch (IgniteFutureTimeoutCheckedException ignored) {
-                        U.warn(log, "Unable to await partitions release latch 
within timeout: " + releaseLatch);
-
-                        // Try to resend ack.
-                        releaseLatch.countDown();
+                    finally {
+                        cctx.exchange().exchangerBlockingSectionEnd();
                     }
+
+                    if (log.isInfoEnabled())
+                        log.info("Finished waiting for partitions release 
latch: " + releaseLatch);
+
+                    break;
+                }
+                catch (IgniteFutureTimeoutCheckedException ignored) {
+                    U.warn(log, "Unable to await partitions release latch 
within timeout: " + releaseLatch);
+
+                    // Try to resend ack.
+                    releaseLatch.countDown();
                 }
             }
-            catch (IgniteCheckedException e) {
-                U.warn(log, "Stop waiting for partitions release latch: " + 
e.getMessage());
-            }
+        }
+        catch (IgniteCheckedException e) {
+            U.warn(log, "Stop waiting for partitions release latch: " + 
e.getMessage());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1573f452/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
index 1ffe5af..35c04fb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
@@ -19,6 +19,7 @@ package 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.la
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -44,12 +45,12 @@ import 
org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 
 /**
@@ -59,6 +60,12 @@ public class ExchangeLatchManager {
     /** Version since latch management is available. */
     private static final IgniteProductVersion VERSION_SINCE = 
IgniteProductVersion.fromString("2.5.0");
 
+    /**
+     * Exchange latch V2 protocol introduces following optimization:
+     * Joining nodes are explicitly excluded from possible latch participants.
+     */
+    public static final IgniteProductVersion PROTOCOL_V2_VERSION_SINCE = 
IgniteProductVersion.fromString("2.7.0");
+
     /** Logger. */
     private final IgniteLogger log;
 
@@ -88,6 +95,10 @@ public class ExchangeLatchManager {
     @GridToStringInclude
     private final ConcurrentMap<CompletableLatchUid, ClientLatch> 
clientLatches = new ConcurrentHashMap<>();
 
+    /** Map (topology version -> joined node on this version). This map is 
needed to exclude joined nodes from latch participants. */
+    @GridToStringExclude
+    private final ConcurrentMap<AffinityTopologyVersion, ClusterNode> 
joinedNodes = new ConcurrentHashMap<>();
+
     /** Lock. */
     private final ReentrantLock lock = new ReentrantLock();
 
@@ -102,7 +113,7 @@ public class ExchangeLatchManager {
         this.discovery = ctx.discovery();
         this.io = ctx.io();
 
-        if (!ctx.clientNode()) {
+        if (!ctx.clientNode() && !ctx.isDaemon()) {
             ctx.io().addMessageListener(GridTopic.TOPIC_EXCHANGE, (nodeId, 
msg, plc) -> {
                 if (msg instanceof LatchAckMessage)
                     processAck(nodeId, (LatchAckMessage) msg);
@@ -119,8 +130,16 @@ public class ExchangeLatchManager {
                 assert e.type() == EVT_NODE_LEFT || e.type() == 
EVT_NODE_FAILED : this;
 
                 // Do not process from discovery thread.
-                ctx.closure().runLocalSafe(() -> 
processNodeLeft(e.eventNode()));
+                // TODO: Should use queue to guarantee the order of processing 
left nodes.
+                ctx.closure().runLocalSafe(() -> 
processNodeLeft(cache.version(), e.eventNode()));
             }, EVT_NODE_LEFT, EVT_NODE_FAILED);
+
+            ctx.event().addDiscoveryEventListener((e, cache) -> {
+                assert e != null;
+                assert e.type() == EVT_NODE_JOINED;
+
+                joinedNodes.put(cache.version(), e.eventNode());
+            }, EVT_NODE_JOINED);
         }
     }
 
@@ -247,10 +266,30 @@ public class ExchangeLatchManager {
     private Collection<ClusterNode> 
getLatchParticipants(AffinityTopologyVersion topVer) {
         Collection<ClusterNode> aliveNodes = aliveNodesForTopologyVer(topVer);
 
-        return aliveNodes
+        List<ClusterNode> participantNodes = aliveNodes
                 .stream()
                 .filter(node -> node.version().compareTo(VERSION_SINCE) >= 0)
                 .collect(Collectors.toList());
+
+        if (canSkipJoiningNodes(topVer))
+            return excludeJoinedNodes(participantNodes, topVer);
+
+        return participantNodes;
+    }
+
+    /**
+     * Excludes a node that was joined on given {@code topVer} from 
participant nodes.
+     *
+     * @param participantNodes Participant nodes.
+     * @param topVer Topology version.
+     */
+    private List<ClusterNode> excludeJoinedNodes(List<ClusterNode> 
participantNodes, AffinityTopologyVersion topVer) {
+        ClusterNode joinedNode = joinedNodes.get(topVer);
+
+        if (joinedNode != null)
+            participantNodes.remove(joinedNode);
+
+        return participantNodes;
     }
 
     /**
@@ -260,12 +299,33 @@ public class ExchangeLatchManager {
     @Nullable private ClusterNode getLatchCoordinator(AffinityTopologyVersion 
topVer) {
         Collection<ClusterNode> aliveNodes = aliveNodesForTopologyVer(topVer);
 
-        return aliveNodes
+        List<ClusterNode> applicableNodes = aliveNodes
             .stream()
             .filter(node -> node.version().compareTo(VERSION_SINCE) >= 0)
             .sorted(Comparator.comparing(ClusterNode::order))
-            .findFirst()
-            .orElse(null);
+            .collect(Collectors.toList());
+
+        if (applicableNodes.isEmpty())
+            return null;
+
+        if (canSkipJoiningNodes(topVer))
+            applicableNodes = excludeJoinedNodes(applicableNodes, topVer);
+
+        return applicableNodes.get(0);
+    }
+
+    /**
+     * Checks that latch manager can use V2 protocol and skip joining nodes 
from latch participants.
+     *
+     * @param topVer Topology version.
+     */
+    public boolean canSkipJoiningNodes(AffinityTopologyVersion topVer) {
+        Collection<ClusterNode> applicableNodes = 
topVer.equals(AffinityTopologyVersion.NONE)
+            ? discovery.aliveServerNodes()
+            : discovery.topology(topVer.topologyVersion());
+
+        return applicableNodes.stream()
+            .allMatch(node -> 
node.version().compareTo(PROTOCOL_V2_VERSION_SINCE) >= 0);
     }
 
     /**
@@ -282,7 +342,7 @@ public class ExchangeLatchManager {
         lock.lock();
 
         try {
-            ClusterNode coordinator = 
getLatchCoordinator(AffinityTopologyVersion.NONE);
+            ClusterNode coordinator = getLatchCoordinator(message.topVer());
 
             if (coordinator == null)
                 return;
@@ -313,11 +373,8 @@ public class ExchangeLatchManager {
                     if (latch.hasParticipant(from) && !latch.hasAck(from))
                         latch.ack(from);
                 }
-                else {
-                    pendingAcks.computeIfAbsent(latchUid, (id) -> new 
GridConcurrentHashSet<>());
-
-                    pendingAcks.get(latchUid).add(from);
-                }
+                else
+                    pendingAcks.computeIfAbsent(latchUid, id -> new 
GridConcurrentHashSet<>()).add(from);
             }
         }
         finally {
@@ -339,7 +396,6 @@ public class ExchangeLatchManager {
         latchesToRestore.addAll(clientLatches.keySet());
 
         for (CompletableLatchUid latchUid : latchesToRestore) {
-            String id = latchUid.id;
             AffinityTopologyVersion topVer = latchUid.topVer;
             Collection<ClusterNode> participants = 
getLatchParticipants(topVer);
 
@@ -359,7 +415,7 @@ public class ExchangeLatchManager {
      *
      * @param left Left node.
      */
-    private void processNodeLeft(ClusterNode left) {
+    private void processNodeLeft(AffinityTopologyVersion topVer, ClusterNode 
left) {
         assert this.crd != null : "Coordinator is not initialized";
 
         lock.lock();
@@ -368,11 +424,17 @@ public class ExchangeLatchManager {
             if (log.isDebugEnabled())
                 log.debug("Process node left " + left.id());
 
-            ClusterNode coordinator = 
getLatchCoordinator(AffinityTopologyVersion.NONE);
+            ClusterNode coordinator = getLatchCoordinator(topVer);
 
             if (coordinator == null)
                 return;
 
+            // Removed node from joined nodes map.
+            joinedNodes.entrySet().stream()
+                .filter(e -> e.getValue().equals(left))
+                .map(e -> e.getKey()) // Map to topology version when node has 
joined.
+                .forEach(joinedNodes::remove);
+
             // Clear pending acks.
             for (Map.Entry<CompletableLatchUid, Set<UUID>> ackEntry : 
pendingAcks.entrySet())
                 if (ackEntry.getValue().contains(left.id()))
@@ -389,9 +451,9 @@ public class ExchangeLatchManager {
                         /* If new coordinator is not able to take control on 
the latch,
                            it means that all other latch participants are left 
from topology
                            and there is no reason to track such latch. */
-                        AffinityTopologyVersion topVer = 
latchEntry.getKey().topVer;
+                        AffinityTopologyVersion latchTopVer = 
latchEntry.getKey().topVer;
 
-                        assert getLatchParticipants(topVer).isEmpty();
+                        assert getLatchParticipants(latchTopVer).isEmpty();
 
                         latch.complete(new IgniteCheckedException("All latch 
participants are left from topology."));
                         clientLatches.remove(latchEntry.getKey());

http://git-wip-us.apache.org/repos/asf/ignite/blob/1573f452/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteExchangeLatchManagerCoordinatorFailTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteExchangeLatchManagerCoordinatorFailTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteExchangeLatchManagerCoordinatorFailTest.java
index 52cd033..d8f23ac 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteExchangeLatchManagerCoordinatorFailTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteExchangeLatchManagerCoordinatorFailTest.java
@@ -23,9 +23,10 @@ import com.google.common.collect.Lists;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.Latch;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.ExchangeLatchManager;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.Latch;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiClosure;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -39,7 +40,10 @@ public class IgniteExchangeLatchManagerCoordinatorFailTest 
extends GridCommonAbs
     private static final String LATCH_NAME = "test";
 
     /** 5 nodes. */
-    private final AffinityTopologyVersion latchTopVer = new 
AffinityTopologyVersion(5, 0);
+    private final AffinityTopologyVersion latchTopVer = new 
AffinityTopologyVersion(5, 1);
+
+    /** Latch coordinator index. */
+    private static final int LATCH_CRD_INDEX = 0;
 
     /** Wait before latch creation. */
     private final IgniteBiClosure<ExchangeLatchManager, CountDownLatch, 
Boolean> beforeCreate = (mgr, syncLatch) -> {
@@ -200,6 +204,8 @@ public class IgniteExchangeLatchManagerCoordinatorFailTest 
extends GridCommonAbs
         IgniteEx crd = (IgniteEx) startGridsMultiThreaded(5);
         crd.cluster().active(true);
 
+        IgniteEx latchCrd = grid(LATCH_CRD_INDEX);
+
         // Latch to synchronize node states.
         CountDownLatch syncLatch = new CountDownLatch(5);
 
@@ -207,31 +213,41 @@ public class 
IgniteExchangeLatchManagerCoordinatorFailTest extends GridCommonAbs
 
         AtomicBoolean hasErrors = new AtomicBoolean();
 
-        for (int node = 1; node < 5; node++) {
-            IgniteEx grid = grid(node);
+        int scenarioIdx = 0;
+
+        for (int nodeId = 0; nodeId < 5; nodeId++) {
+            if (nodeId == LATCH_CRD_INDEX)
+                continue;
+
+            IgniteEx grid = grid(nodeId);
+
             ExchangeLatchManager latchMgr = 
grid.context().cache().context().exchange().latch();
-            final int stateIdx = node - 1;
+
+            IgniteBiClosure<ExchangeLatchManager, CountDownLatch, Boolean> 
scenario = nodeScenarios.get(scenarioIdx);
 
             IgniteInternalFuture<?> fut = 
GridTestUtils.runMultiThreadedAsync(() -> {
-                boolean success = nodeScenarios.get(stateIdx).apply(latchMgr, 
syncLatch);
+                boolean success = scenario.apply(latchMgr, syncLatch);
+
                 if (!success)
                     hasErrors.set(true);
-            }, 1, "latch-runner-" + node);
+            }, 1, "latch-runner-" + nodeId);
 
             finishAllLatches.add(fut);
+
+            scenarioIdx++;
         }
 
         finishAllLatches.markInitialized();
 
         // Wait while all nodes reaches their states.
         while (syncLatch.getCount() != 1) {
-            Thread.sleep(10);
+            U.sleep(10);
 
             if (hasErrors.get())
                 throw new Exception("All nodes should complete latches without 
errors");
         }
 
-        crd.close();
+        latchCrd.close();
 
         // Resume progress for all nodes.
         syncLatch.countDown();

Reply via email to