Repository: ignite
Updated Branches:
  refs/heads/master a164296b1 -> 6582b0849


IGNITE-8783 Failover tests periodically cause hanging of the whole Data 
Structures suite on TC

Signed-off-by: Anton Vinogradov <a...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6582b084
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6582b084
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6582b084

Branch: refs/heads/master
Commit: 6582b084943ac63c36b062c8ae66b43883ef6c19
Parents: a164296
Author: Anton Vinogradov <a...@apache.org>
Authored: Mon Jul 23 13:18:16 2018 +0300
Committer: Anton Vinogradov <a...@apache.org>
Committed: Mon Jul 23 13:18:16 2018 +0300

----------------------------------------------------------------------
 .../preloader/latch/ExchangeLatchManager.java   | 218 ++++++++++---------
 1 file changed, 119 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6582b084/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
index 7e579cb..8a57b90 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
@@ -16,11 +16,11 @@
  */
 package 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch;
 
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
+import java.util.Comparator;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -45,8 +45,6 @@ import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.T2;
-import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.jetbrains.annotations.Nullable;
@@ -80,15 +78,15 @@ public class ExchangeLatchManager {
     private volatile ClusterNode crd;
 
     /** Pending acks collection. */
-    private final ConcurrentMap<T2<String, AffinityTopologyVersion>, 
Set<UUID>> pendingAcks = new ConcurrentHashMap<>();
+    private final ConcurrentMap<CompletableLatchUid, Set<UUID>> pendingAcks = 
new ConcurrentHashMap<>();
 
     /** Server latches collection. */
     @GridToStringInclude
-    private final ConcurrentMap<T2<String, AffinityTopologyVersion>, 
ServerLatch> serverLatches = new ConcurrentHashMap<>();
+    private final ConcurrentMap<CompletableLatchUid, ServerLatch> 
serverLatches = new ConcurrentHashMap<>();
 
     /** Client latches collection. */
     @GridToStringInclude
-    private final ConcurrentMap<T2<String, AffinityTopologyVersion>, 
ClientLatch> clientLatches = new ConcurrentHashMap<>();
+    private final ConcurrentMap<CompletableLatchUid, ClientLatch> 
clientLatches = new ConcurrentHashMap<>();
 
     /** Lock. */
     private final ReentrantLock lock = new ReentrantLock();
@@ -130,37 +128,30 @@ public class ExchangeLatchManager {
      * Creates server latch with given {@code id} and {@code topVer}.
      * Adds corresponding pending acks to it.
      *
-     * @param id Latch id.
-     * @param topVer Latch topology version.
+     * @param latchUid Latch uid.
      * @param participants Participant nodes.
      * @return Server latch instance.
      */
-    private Latch createServerLatch(String id, AffinityTopologyVersion topVer, 
Collection<ClusterNode> participants) {
-        final T2<String, AffinityTopologyVersion> latchId = new T2<>(id, 
topVer);
+    private Latch createServerLatch(CompletableLatchUid latchUid, 
Collection<ClusterNode> participants) {
+        assert !serverLatches.containsKey(latchUid);
 
-        if (serverLatches.containsKey(latchId))
-            return serverLatches.get(latchId);
+        ServerLatch latch = new ServerLatch(latchUid, participants);
 
-        ServerLatch latch = new ServerLatch(id, topVer, participants);
-
-        serverLatches.put(latchId, latch);
+        serverLatches.put(latchUid, latch);
 
         if (log.isDebugEnabled())
-            log.debug("Server latch is created [latch=" + latchId + ", 
participantsSize=" + participants.size() + "]");
+            log.debug("Server latch is created [latch=" + latchUid + ", 
participantsSize=" + participants.size() + "]");
 
-        if (pendingAcks.containsKey(latchId)) {
-            Set<UUID> acks = pendingAcks.get(latchId);
+        if (pendingAcks.containsKey(latchUid)) {
+            Set<UUID> acks = pendingAcks.get(latchUid);
 
             for (UUID node : acks)
                 if (latch.hasParticipant(node) && !latch.hasAck(node))
                     latch.ack(node);
 
-            pendingAcks.remove(latchId);
+            pendingAcks.remove(latchUid);
         }
 
-        if (latch.isCompleted())
-            serverLatches.remove(latchId);
-
         return latch;
     }
 
@@ -168,32 +159,23 @@ public class ExchangeLatchManager {
      * Creates client latch.
      * If there is final ack corresponds to given {@code id} and {@code 
topVer}, latch will be completed immediately.
      *
-     * @param id Latch id.
-     * @param topVer Latch topology version.
+     * @param latchUid Latch uid.
      * @param coordinator Coordinator node.
      * @param participants Participant nodes.
      * @return Client latch instance.
      */
-    private Latch createClientLatch(String id, AffinityTopologyVersion topVer, 
ClusterNode coordinator, Collection<ClusterNode> participants) {
-        final T2<String, AffinityTopologyVersion> latchId = new T2<>(id, 
topVer);
-
-        if (clientLatches.containsKey(latchId))
-            return clientLatches.get(latchId);
+    private Latch createClientLatch(CompletableLatchUid latchUid, ClusterNode 
coordinator, Collection<ClusterNode> participants) {
+        assert !serverLatches.containsKey(latchUid);
+        assert !clientLatches.containsKey(latchUid);
 
-        ClientLatch latch = new ClientLatch(id, topVer, coordinator, 
participants);
+        ClientLatch latch = new ClientLatch(latchUid, coordinator, 
participants);
 
         if (log.isDebugEnabled())
-            log.debug("Client latch is created [latch=" + latchId
-                    + ", crd=" + coordinator
-                    + ", participantsSize=" + participants.size() + "]");
+            log.debug("Client latch is created [latch=" + latchUid
+                + ", crd=" + coordinator
+                + ", participantsSize=" + participants.size() + "]");
 
-        // There is final ack for created latch.
-        if (pendingAcks.containsKey(latchId)) {
-            latch.complete();
-            pendingAcks.remove(latchId);
-        }
-        else
-            clientLatches.put(latchId, latch);
+        clientLatches.put(latchUid, latch);
 
         return latch;
     }
@@ -212,20 +194,24 @@ public class ExchangeLatchManager {
         lock.lock();
 
         try {
-            ClusterNode coordinator = getLatchCoordinator(topVer);
+            final CompletableLatchUid latchUid = new CompletableLatchUid(id, 
topVer);
 
-            if (coordinator == null) {
-                ClientLatch latch = new ClientLatch(id, 
AffinityTopologyVersion.NONE, null, Collections.emptyList());
-                latch.complete();
+            CompletableLatch latch = clientLatches.containsKey(latchUid) ?
+                clientLatches.get(latchUid) : serverLatches.get(latchUid);
 
+            if (latch != null)
                 return latch;
-            }
+
+            ClusterNode coordinator = getLatchCoordinator(topVer);
+
+            if (coordinator == null)
+                return null;
 
             Collection<ClusterNode> participants = 
getLatchParticipants(topVer);
 
             return coordinator.isLocal()
-                ? createServerLatch(id, topVer, participants)
-                : createClientLatch(id, topVer, coordinator, participants);
+                ? createServerLatch(latchUid, participants)
+                : createClientLatch(latchUid, coordinator, participants);
         }
         finally {
             lock.unlock();
@@ -275,10 +261,11 @@ public class ExchangeLatchManager {
         Collection<ClusterNode> aliveNodes = aliveNodesForTopologyVer(topVer);
 
         return aliveNodes
-                .stream()
-                .filter(node -> node.version().compareTo(VERSION_SINCE) >= 0)
-                .findFirst()
-                .orElse(null);
+            .stream()
+            .filter(node -> node.version().compareTo(VERSION_SINCE) >= 0)
+            .sorted(Comparator.comparing(ClusterNode::order))
+            .findFirst()
+            .orElse(null);
     }
 
     /**
@@ -300,39 +287,36 @@ public class ExchangeLatchManager {
             if (coordinator == null)
                 return;
 
-            T2<String, AffinityTopologyVersion> latchId = new 
T2<>(message.latchId(), message.topVer());
+            CompletableLatchUid latchUid = new 
CompletableLatchUid(message.latchId(), message.topVer());
 
             if (message.isFinal()) {
                 if (log.isDebugEnabled())
-                    log.debug("Process final ack [latch=" + latchId + ", 
from=" + from + "]");
+                    log.debug("Process final ack [latch=" + latchUid + ", 
from=" + from + "]");
+
+                assert serverLatches.containsKey(latchUid) || 
clientLatches.containsKey(latchUid);
+
+                if (clientLatches.containsKey(latchUid)) {
+                    ClientLatch latch = clientLatches.remove(latchUid);
 
-                if (clientLatches.containsKey(latchId)) {
-                    ClientLatch latch = clientLatches.remove(latchId);
                     latch.complete();
                 }
-                else if (!coordinator.isLocal()) {
-                    pendingAcks.computeIfAbsent(latchId, (id) -> new 
GridConcurrentHashSet<>());
-                    pendingAcks.get(latchId).add(from);
-                }
-                else if (coordinator.isLocal())
-                    serverLatches.remove(latchId);
-            } else {
+
+                serverLatches.remove(latchUid);
+            }
+            else {
                 if (log.isDebugEnabled())
-                    log.debug("Process ack [latch=" + latchId + ", from=" + 
from + "]");
+                    log.debug("Process ack [latch=" + latchUid + ", from=" + 
from + "]");
 
-                if (serverLatches.containsKey(latchId)) {
-                    ServerLatch latch = serverLatches.get(latchId);
+                if (serverLatches.containsKey(latchUid)) {
+                    ServerLatch latch = serverLatches.get(latchUid);
 
-                    if (latch.hasParticipant(from) && !latch.hasAck(from)) {
+                    if (latch.hasParticipant(from) && !latch.hasAck(from))
                         latch.ack(from);
-
-                        if (latch.isCompleted())
-                            serverLatches.remove(latchId);
-                    }
                 }
                 else {
-                    pendingAcks.computeIfAbsent(latchId, (id) -> new 
GridConcurrentHashSet<>());
-                    pendingAcks.get(latchId).add(from);
+                    pendingAcks.computeIfAbsent(latchUid, (id) -> new 
GridConcurrentHashSet<>());
+
+                    pendingAcks.get(latchUid).add(from);
                 }
             }
         }
@@ -349,17 +333,18 @@ public class ExchangeLatchManager {
         if (log.isInfoEnabled())
             log.info("Become new coordinator " + crd.id());
 
-        List<T2<String, AffinityTopologyVersion>> latchesToRestore = new 
ArrayList<>();
+        Set<CompletableLatchUid> latchesToRestore = new HashSet<>();
+
         latchesToRestore.addAll(pendingAcks.keySet());
         latchesToRestore.addAll(clientLatches.keySet());
 
-        for (T2<String, AffinityTopologyVersion> latchId : latchesToRestore) {
-            String id = latchId.get1();
-            AffinityTopologyVersion topVer = latchId.get2();
+        for (CompletableLatchUid latchUid : latchesToRestore) {
+            String id = latchUid.id;
+            AffinityTopologyVersion topVer = latchUid.topVer;
             Collection<ClusterNode> participants = 
getLatchParticipants(topVer);
 
             if (!participants.isEmpty())
-                createServerLatch(id, topVer, participants);
+                createServerLatch(latchUid, participants);
         }
     }
 
@@ -389,12 +374,12 @@ public class ExchangeLatchManager {
                 return;
 
             // Clear pending acks.
-            for (Map.Entry<T2<String, AffinityTopologyVersion>, Set<UUID>> 
ackEntry : pendingAcks.entrySet())
+            for (Map.Entry<CompletableLatchUid, Set<UUID>> ackEntry : 
pendingAcks.entrySet())
                 if (ackEntry.getValue().contains(left.id()))
                     pendingAcks.get(ackEntry.getKey()).remove(left.id());
 
             // Change coordinator for client latches.
-            for (Map.Entry<T2<String, AffinityTopologyVersion>, ClientLatch> 
latchEntry : clientLatches.entrySet()) {
+            for (Map.Entry<CompletableLatchUid, ClientLatch> latchEntry : 
clientLatches.entrySet()) {
                 ClientLatch latch = latchEntry.getValue();
                 if (latch.hasCoordinator(left.id())) {
                     // Change coordinator for latch and re-send ack if 
necessary.
@@ -404,7 +389,7 @@ public class ExchangeLatchManager {
                         /* If new coordinator is not able to take control on 
the latch,
                            it means that all other latch participants are left 
from topology
                            and there is no reason to track such latch. */
-                        AffinityTopologyVersion topVer = 
latchEntry.getKey().get2();
+                        AffinityTopologyVersion topVer = 
latchEntry.getKey().topVer;
 
                         assert getLatchParticipants(topVer).isEmpty();
 
@@ -415,7 +400,7 @@ public class ExchangeLatchManager {
             }
 
             // Add acknowledgements from left node.
-            for (Map.Entry<T2<String, AffinityTopologyVersion>, ServerLatch> 
latchEntry : serverLatches.entrySet()) {
+            for (Map.Entry<CompletableLatchUid, ServerLatch> latchEntry : 
serverLatches.entrySet()) {
                 ServerLatch latch = latchEntry.getValue();
 
                 if (latch.hasParticipant(left.id()) && 
!latch.hasAck(left.id())) {
@@ -423,9 +408,6 @@ public class ExchangeLatchManager {
                         log.debug("Process node left [latch=" + 
latchEntry.getKey() + ", left=" + left.id() + "]");
 
                     latch.ack(left.id());
-
-                    if (latch.isCompleted())
-                        serverLatches.remove(latchEntry.getKey());
                 }
             }
 
@@ -458,12 +440,11 @@ public class ExchangeLatchManager {
         /**
          * Constructor.
          *
-         * @param id Latch id.
-         * @param topVer Latch topology version.
+         * @param latchUid Latch uid.
          * @param participants Participant nodes.
          */
-        ServerLatch(String id, AffinityTopologyVersion topVer, 
Collection<ClusterNode> participants) {
-            super(id, topVer, participants);
+        ServerLatch(CompletableLatchUid latchUid, Collection<ClusterNode> 
participants) {
+            super(latchUid, participants);
             this.permits = new AtomicInteger(participants.size());
 
             // Send final acks when latch is completed.
@@ -558,13 +539,12 @@ public class ExchangeLatchManager {
         /**
          * Constructor.
          *
-         * @param id Latch id.
-         * @param topVer Latch topology version.
+         * @param latchUid Latch uid.
          * @param coordinator Coordinator node.
          * @param participants Participant nodes.
          */
-        ClientLatch(String id, AffinityTopologyVersion topVer, ClusterNode 
coordinator, Collection<ClusterNode> participants) {
-            super(id, topVer, participants);
+        ClientLatch(CompletableLatchUid latchUid, ClusterNode coordinator, 
Collection<ClusterNode> participants) {
+            super(latchUid, participants);
 
             this.coordinator = coordinator;
         }
@@ -658,13 +638,12 @@ public class ExchangeLatchManager {
         /**
          * Constructor.
          *
-         * @param id Latch id.
-         * @param topVer Latch topology version.
+         * @param latchUid Latch uid.
          * @param participants Participant nodes.
          */
-        CompletableLatch(String id, AffinityTopologyVersion topVer, 
Collection<ClusterNode> participants) {
-            this.id = id;
-            this.topVer = topVer;
+        CompletableLatch(CompletableLatchUid latchUid, Collection<ClusterNode> 
participants) {
+            this.id = latchUid.id;
+            this.topVer = latchUid.topVer;
             this.participants = 
participants.stream().map(ClusterNode::id).collect(Collectors.toSet());
         }
 
@@ -724,6 +703,47 @@ public class ExchangeLatchManager {
         }
     }
 
+    /**
+     * Latch id + topology
+     */
+    private static class CompletableLatchUid {
+        /** Id. */
+        private String id;
+
+        /** Topology version. */
+        private AffinityTopologyVersion topVer;
+
+        /**
+         * @param id Id.
+         * @param topVer Topology version.
+         */
+        private CompletableLatchUid(String id, AffinityTopologyVersion topVer) 
{
+            this.id = id;
+            this.topVer = topVer;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+            CompletableLatchUid uid = (CompletableLatchUid)o;
+            return Objects.equals(id, uid.id) &&
+                Objects.equals(topVer, uid.topVer);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return Objects.hash(id, topVer);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "CompletableLatchUid{" + "id='" + id + '\'' + ", topVer=" + 
topVer + '}';
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(ExchangeLatchManager.class, this);

Reply via email to