Repository: ignite
Updated Branches:
  refs/heads/master 5c363184c -> b95f76f8a


ignite-5727 Call TcpCommunicationSpi's discovery listener first


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b95f76f8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b95f76f8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b95f76f8

Branch: refs/heads/master
Commit: b95f76f8a0a3a7e920f78f20b3d814112fc6d522
Parents: 5c36318
Author: sboikov <sboi...@gridgain.com>
Authored: Wed Jul 12 08:47:04 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Wed Jul 12 08:47:04 2017 +0300

----------------------------------------------------------------------
 .../eventstorage/GridEventStorageManager.java   | 309 ++++++++++---------
 .../eventstorage/HighPriorityListener.java      |  28 ++
 .../processors/cache/GridCacheMvccManager.java  |   5 -
 .../processors/cache/GridCacheProcessor.java    |   2 -
 .../continuous/GridContinuousProcessor.java     |  91 +++---
 .../communication/tcp/TcpCommunicationSpi.java  |  30 +-
 6 files changed, 269 insertions(+), 196 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b95f76f8/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
index 1714cbb..944420f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.EventListener;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -83,10 +84,7 @@ import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.PUB
  */
 public class GridEventStorageManager extends 
GridManagerAdapter<EventStorageSpi> {
     /** Local event listeners. */
-    private final ConcurrentMap<Integer, Set<EventListener>> lsnrs = new 
ConcurrentHashMap8<>();
-
-    /** Internal discovery listeners. */
-    private final ConcurrentMap<Integer, Set<DiscoveryEventListener>> 
discoLsnrs = new ConcurrentHashMap8<>();
+    private final ConcurrentMap<Integer, Listeners> lsnrs = new 
ConcurrentHashMap8<>();
 
     /** Busy lock to control activity of threads. */
     private final ReadWriteLock busyLock = new ReentrantReadWriteLock();
@@ -208,8 +206,8 @@ public class GridEventStorageManager extends 
GridManagerAdapter<EventStorageSpi>
     @Override public void printMemoryStats() {
         int lsnrsCnt = 0;
 
-        for (Set<EventListener> lsnrs0 : lsnrs.values())
-            lsnrsCnt += lsnrs0.size();
+        for (Listeners lsnrs0 : lsnrs.values())
+            lsnrsCnt += lsnrs0.lsnrs.size();
 
         X.println(">>>");
         X.println(">>> Event storage manager memory stats 
[igniteInstanceName=" + ctx.igniteInstanceName() + ']');
@@ -250,9 +248,7 @@ public class GridEventStorageManager extends 
GridManagerAdapter<EventStorageSpi>
 
         try {
             if (msgLsnr != null)
-                ctx.io().removeMessageListener(
-                    TOPIC_EVENT,
-                    msgLsnr);
+                ctx.io().removeMessageListener(TOPIC_EVENT, msgLsnr);
 
             msgLsnr = null;
 
@@ -332,13 +328,14 @@ public class GridEventStorageManager extends 
GridManagerAdapter<EventStorageSpi>
             }
 
             // Override user recordable settings for daemon node.
-            if ((isDaemon || isUserRecordable(type)) && !isHiddenEvent(type))
+            if ((isDaemon || isUserRecordable(type)) && !isHiddenEvent(type)) {
                 try {
                     getSpi().record(evt);
                 }
                 catch (IgniteSpiException e) {
                     U.error(log, "Failed to record event: " + evt, e);
                 }
+            }
 
             if (isRecordable(type))
                 notifyListeners(lsnrs.get(evt.type()), evt, params);
@@ -669,17 +666,13 @@ public class GridEventStorageManager extends 
GridManagerAdapter<EventStorageSpi>
      * @param lsnr Listener to add.
      * @param types Event types to subscribe listener for.
      */
-    private void addEventListener(EventListener lsnr, int[] types) {
+    private void addEventListener(ListenerWrapper lsnr, int[] types) {
         if (!enterBusy())
             return;
 
         try {
-            for (int t : types) {
-                getOrCreate(lsnrs, t).add(lsnr);
-
-                if (!isRecordable(t))
-                    U.warn(log, "Added listener for disabled event type: " + 
U.gridEventName(t));
-            }
+            for (int t : types)
+                registerListener(lsnr, t);
         }
         finally {
             leaveBusy();
@@ -693,23 +686,16 @@ public class GridEventStorageManager extends 
GridManagerAdapter<EventStorageSpi>
      * @param type Event type to subscribe listener for.
      * @param types Additional event types to subscribe listener for.
      */
-    private void addEventListener(EventListener lsnr, int type, @Nullable 
int... types) {
+    private void addEventListener(ListenerWrapper lsnr, int type, @Nullable 
int... types) {
         if (!enterBusy())
             return;
 
         try {
-            getOrCreate(lsnrs, type).add(lsnr);
-
-            if (!isRecordable(type))
-                U.warn(log, "Added listener for disabled event type: " + 
U.gridEventName(type));
+            registerListener(lsnr, type);
 
             if (types != null) {
-                for (int t : types) {
-                    getOrCreate(lsnrs, t).add(lsnr);
-
-                    if (!isRecordable(t))
-                        U.warn(log, "Added listener for disabled event type: " 
+ U.gridEventName(t));
-                }
+                for (int t : types)
+                    registerListener(lsnr, t);
             }
         }
         finally {
@@ -718,25 +704,25 @@ public class GridEventStorageManager extends 
GridManagerAdapter<EventStorageSpi>
     }
 
     /**
-     * @param lsnrs Listeners map.
+     * @param lsnr Listener.
      * @param type Event type.
-     * @return Listeners for given event type.
      */
-    private <T> Collection<T> getOrCreate(ConcurrentMap<Integer, Set<T>> 
lsnrs, Integer type) {
-        Set<T> set = lsnrs.get(type);
+    private void registerListener(ListenerWrapper lsnr, Integer type) {
+        Listeners lsnrs0 = lsnrs.get(type);
 
-        if (set == null) {
-            set = new GridConcurrentLinkedHashSet<>();
+        if (lsnrs0 == null) {
+            lsnrs0 = new Listeners();
 
-            Set<T> prev = lsnrs.putIfAbsent(type, set);
+            Listeners prev = lsnrs.putIfAbsent(type, lsnrs0);
 
             if (prev != null)
-                set = prev;
+                lsnrs0 = prev;
         }
 
-        assert set != null;
+        lsnrs0.addListener(lsnr);
 
-        return set;
+        if (!isRecordable(type))
+            U.warn(log, "Added listener for disabled event type: " + 
U.gridEventName(type));
     }
 
     /**
@@ -789,29 +775,29 @@ public class GridEventStorageManager extends 
GridManagerAdapter<EventStorageSpi>
      * @param types Event types.
      * @return Returns {@code true} if removed.
      */
-    private boolean removeEventListener(EventListener lsnr, @Nullable int[] 
types) {
+    private boolean removeEventListener(ListenerWrapper lsnr, @Nullable int[] 
types) {
         assert lsnr != null;
 
         boolean found = false;
 
         if (F.isEmpty(types)) {
-            for (Set<EventListener> set : lsnrs.values())
-                if (set.remove(lsnr))
+            for (Listeners set : lsnrs.values()) {
+                if (set.removeListener(lsnr))
                     found = true;
+            }
         }
         else {
             assert types != null;
 
             for (int type : types) {
-                Set<EventListener> set = lsnrs.get(type);
+                Listeners set = lsnrs.get(type);
 
-                if (set != null && set.remove(lsnr))
+                if (set != null && set.removeListener(lsnr))
                     found = true;
             }
         }
 
-        if (lsnr instanceof UserListenerWrapper)
-        {
+        if (lsnr instanceof UserListenerWrapper) {
             IgnitePredicate p = ((UserListenerWrapper)lsnr).listener();
 
             if (p instanceof PlatformEventFilterListener)
@@ -845,96 +831,38 @@ public class GridEventStorageManager extends 
GridManagerAdapter<EventStorageSpi>
     }
 
     /**
-     *
-     * @param timeout Timeout.
-     * @param c Optional continuation.
-     * @param p Optional predicate.
-     * @param types Event types to wait for.
-     * @return Event.
-     * @throws IgniteCheckedException Thrown in case of any errors.
-     */
-    public Event waitForEvent(long timeout, @Nullable Runnable c,
-        @Nullable final IgnitePredicate<? super Event> p, int... types) throws 
IgniteCheckedException {
-        assert timeout >= 0;
-
-        final GridFutureAdapter<Event> fut = new GridFutureAdapter<>();
-
-        addLocalEventListener(new GridLocalEventListener() {
-            @Override public void onEvent(Event evt) {
-                if (p == null || p.apply(evt)) {
-                    fut.onDone(evt);
-
-                    removeLocalEventListener(this);
-                }
-            }
-        }, types);
-
-        try {
-            if (c != null)
-                c.run();
-        }
-        catch (Exception e) {
-            throw new IgniteCheckedException(e);
-        }
-
-        return fut.get(timeout);
-    }
-
-    /**
-     * @param set Set of listeners.
+     * @param lsnrs Set of listeners.
      * @param evt Grid event.
+     * @param params Event parameters.
      */
-    private void notifyListeners(@Nullable Collection<EventListener> set, 
Event evt, Object[] params) {
+    private void notifyListeners(@Nullable Listeners lsnrs, Event evt, 
Object[] params) {
         assert evt != null;
 
-        if (!F.isEmpty(set)) {
-            assert set != null;
-
-            for (EventListener lsnr : set) {
-                try {
-                    ((ListenerWrapper)lsnr).onEvent(evt, params);
-                }
-                catch (Throwable e) {
-                    U.error(log, "Unexpected exception in listener 
notification for event: " + evt, e);
+        if (lsnrs != null) {
+            notifyListeners(lsnrs.highPriorityLsnrs, evt, params);
 
-                    if (e instanceof Error)
-                        throw (Error)e;
-                }
-            }
+            notifyListeners(lsnrs.lsnrs, evt, params);
         }
     }
 
     /**
-     * @param evt Discovery event
-     * @param cache Discovery cache.
+     * @param lsnrs Listeners collection.
+     * @param evt Event.
+     * @param params Event parameters.
      */
-    private void notifyDiscoveryListeners(DiscoveryEvent evt, DiscoCache 
cache) {
-        assert evt != null;
-
-        notifyDiscoveryListeners(discoLsnrs.get(evt.type()), evt, cache);
-    }
-
-    /**
-     * @param set Set of listeners.
-     * @param evt Discovery event.
-     * @param cache Discovery cache.
-     */
-    private void notifyDiscoveryListeners(@Nullable 
Collection<DiscoveryEventListener> set, DiscoveryEvent evt, DiscoCache cache) {
-        assert evt != null;
-
-        if (!F.isEmpty(set)) {
-            assert set != null;
+    private void notifyListeners(@Nullable Collection<ListenerWrapper> lsnrs, 
Event evt, Object[] params) {
+        if (lsnrs == null || lsnrs.isEmpty())
+            return;
 
-            for (DiscoveryEventListener lsnr : set) {
-                try {
-                    lsnr.onEvent(evt, cache);
-                }
-                catch (Throwable e) {
-                    U.error(log, "Unexpected exception in listener 
notification for event: " + evt, e);
+        for (EventListener lsnr : lsnrs) {
+            try {
+                ((ListenerWrapper)lsnr).onEvent(evt, params);
+            }
+            catch (Throwable e) {
+                U.error(log, "Unexpected exception in listener notification 
for event: " + evt, e);
 
-                    if (e instanceof Error)
-                        throw (Error)e;
-                }
+                if (e instanceof Error)
+                    throw (Error)e;
             }
         }
     }
@@ -1208,16 +1136,6 @@ public class GridEventStorageManager extends 
GridManagerAdapter<EventStorageSpi>
     }
 
     /**
-     * @param arr Array.
-     * @return Array copy.
-     */
-    private boolean[] copy(boolean[] arr) {
-        assert arr != null;
-
-        return Arrays.copyOf(arr, arr.length);
-    }
-
-    /**
      *
      */
     private class RequestListener implements GridMessageListener {
@@ -1329,9 +1247,98 @@ public class GridEventStorageManager extends 
GridManagerAdapter<EventStorageSpi>
         }
     }
 
-    /** */
+    /**
+     *
+     */
+    static class Listeners {
+        /** */
+        static Comparator<ListenerWrapper> ORDERED_CMP = new 
Comparator<ListenerWrapper>() {
+            @Override public int compare(ListenerWrapper lsnr1, 
ListenerWrapper lsnr2) {
+                int o1 = ((HighPriorityListener)lsnr1.listener()).order();
+                int o2 = ((HighPriorityListener)lsnr2.listener()).order();
+
+                return Integer.compare(o1, o2);
+            }
+        };
+
+        /** */
+        private volatile List<ListenerWrapper> highPriorityLsnrs;
+
+        /** */
+        private final Set<ListenerWrapper> lsnrs = new 
GridConcurrentLinkedHashSet<>();
+
+        /**
+         * @param lsnr Listener to add.
+         */
+        void addListener(ListenerWrapper lsnr) {
+            if (lsnr.highPriority()) {
+                synchronized (this) {
+                    List<ListenerWrapper> curLsnrs = highPriorityLsnrs;
+                    List<ListenerWrapper> newLsnrs = new ArrayList<>();
+
+                    if (curLsnrs != null)
+                        newLsnrs.addAll(curLsnrs);
+
+                    assert !newLsnrs.contains(lsnr) : lsnr;
+
+                    newLsnrs.add(lsnr);
+
+                    Collections.sort(newLsnrs, ORDERED_CMP);
+
+                    highPriorityLsnrs = newLsnrs;
+                }
+            }
+            else
+                lsnrs.add(lsnr);
+        }
+
+        /**
+         * @param lsnr Listener to remove.
+         * @return {@code True}
+         */
+        boolean removeListener(ListenerWrapper lsnr) {
+            if (lsnr.highPriority()) {
+                synchronized (this) {
+                    List<ListenerWrapper> curLsnrs = highPriorityLsnrs;
+
+                    if (curLsnrs == null)
+                        return false;
+
+                    List<ListenerWrapper> newLsnrs = new ArrayList<>(curLsnrs);
+
+                    if (newLsnrs.remove(lsnr)) {
+                        highPriorityLsnrs = newLsnrs;
+
+                        return true;
+                    }
+
+                    return false;
+                }
+            }
+            else
+                return lsnrs.remove(lsnr);
+        }
+    }
+
+    /**
+     *
+     */
     private abstract static class ListenerWrapper implements EventListener {
+        /**
+         * @param evt Event.
+         * @param params Parameters.
+         */
         abstract void onEvent(Event evt, Object[] params);
+
+        /**
+         * @return Wrapped listener.
+         */
+        abstract Object listener();
+
+        /**
+         * @return {@code True} if high priority listener.
+         */
+        abstract boolean highPriority();
     }
 
     /**
@@ -1349,6 +1356,16 @@ public class GridEventStorageManager extends 
GridManagerAdapter<EventStorageSpi>
         }
 
         /** {@inheritDoc} */
+        @Override EventListener listener() {
+            return lsnr;
+        }
+
+        /** {@inheritDoc} */
+        @Override boolean highPriority() {
+            return lsnr instanceof HighPriorityListener;
+        }
+
+        /** {@inheritDoc} */
         @Override void onEvent(Event evt, Object[] params) {
             lsnr.onEvent(evt);
         }
@@ -1387,8 +1404,18 @@ public class GridEventStorageManager extends 
GridManagerAdapter<EventStorageSpi>
         }
 
         /** {@inheritDoc} */
+        @Override EventListener listener() {
+            return lsnr;
+        }
+
+        /** {@inheritDoc} */
+        @Override boolean highPriority() {
+            return lsnr instanceof HighPriorityListener;
+        }
+
+        /** {@inheritDoc} */
         @Override void onEvent(Event evt, Object[] params) {
-            // No checks there since only DiscoveryManager produses 
DiscoveryEvents
+            // No checks there since only DiscoveryManager produces 
DiscoveryEvents
             // and it uses an overloaded method with additional parameters
             lsnr.onEvent((DiscoveryEvent)evt, (DiscoCache)params[0]);
         }
@@ -1426,10 +1453,8 @@ public class GridEventStorageManager extends 
GridManagerAdapter<EventStorageSpi>
             this.lsnr = (IgnitePredicate<Event>)lsnr;
         }
 
-        /**
-         * @return User listener.
-         */
-        private IgnitePredicate<? extends Event> listener() {
+        /** {@inheritDoc} */
+        public IgnitePredicate<? extends Event> listener() {
             return lsnr;
         }
 
@@ -1450,12 +1475,16 @@ public class GridEventStorageManager extends 
GridManagerAdapter<EventStorageSpi>
             UserListenerWrapper that = (UserListenerWrapper)o;
 
             return lsnr.equals(that.lsnr);
-
         }
 
         /** {@inheritDoc} */
         @Override public int hashCode() {
             return lsnr.hashCode();
         }
+
+        /** {@inheritDoc} */
+        @Override boolean highPriority() {
+            return false;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b95f76f8/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/HighPriorityListener.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/HighPriorityListener.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/HighPriorityListener.java
new file mode 100644
index 0000000..c55aa8d
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/HighPriorityListener.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.eventstorage;
+
+/**
+ * Marker interface for listeners called before 'regular' listeners.
+ */
+public interface HighPriorityListener {
+    /**
+     * @return Order.
+     */
+    public int order();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b95f76f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index a6907b9..b156708 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -272,12 +272,7 @@ public class GridCacheMvccManager extends 
GridCacheSharedManagerAdapter {
         exchLog = cctx.logger(getClass().getName() + ".exchange");
 
         pendingExplicit = GridConcurrentFactory.newMap();
-    }
 
-    /**
-     * Cache futures listener must be registered after communication listener.
-     */
-    public void registerEventListener() {
         cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, 
EVT_NODE_LEFT);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b95f76f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 0488a14..9cedac6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -838,8 +838,6 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
 
             ctx.query().onCacheKernalStart();
 
-            sharedCtx.mvcc().registerEventListener();
-
             sharedCtx.exchange().onKernalStart(active, false);
         }
         finally {

http://git-wip-us.apache.org/repos/asf/ignite/blob/b95f76f8/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 8b9b277..7062353 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -55,6 +55,7 @@ import 
org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
 import org.apache.ignite.internal.managers.discovery.CustomEventListener;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
@@ -161,46 +162,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
 
         marsh = ctx.config().getMarshaller();
 
-        ctx.event().addLocalEventListener(new GridLocalEventListener() {
-            @SuppressWarnings({"fallthrough", "TooBroadScope"})
-            @Override public void onEvent(Event evt) {
-                assert evt instanceof DiscoveryEvent;
-                assert evt.type() == EVT_NODE_LEFT || evt.type() == 
EVT_NODE_FAILED;
-
-                UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
-
-                clientInfos.remove(nodeId);
-
-                // Unregister handlers created by left node.
-                for (Map.Entry<UUID, RemoteRoutineInfo> e : 
rmtInfos.entrySet()) {
-                    UUID routineId = e.getKey();
-                    RemoteRoutineInfo info = e.getValue();
-
-                    if (nodeId.equals(info.nodeId)) {
-                        if (info.autoUnsubscribe)
-                            unregisterRemote(routineId);
-
-                        if (info.hnd.isQuery())
-                            info.hnd.onNodeLeft();
-                    }
-                }
-
-                for (Map.Entry<IgniteUuid, SyncMessageAckFuture> e : 
syncMsgFuts.entrySet()) {
-                    SyncMessageAckFuture fut = e.getValue();
-
-                    if (fut.nodeId().equals(nodeId)) {
-                        SyncMessageAckFuture fut0 = 
syncMsgFuts.remove(e.getKey());
-
-                        if (fut0 != null) {
-                            ClusterTopologyCheckedException err = new 
ClusterTopologyCheckedException(
-                                "Node left grid while sending message to: " + 
nodeId);
-
-                            fut0.onDone(err);
-                        }
-                    }
-                }
-            }
-        }, EVT_NODE_LEFT, EVT_NODE_FAILED);
+        ctx.event().addLocalEventListener(new DiscoveryListener(), 
EVT_NODE_LEFT, EVT_NODE_FAILED);
 
         ctx.event().addLocalEventListener(new GridLocalEventListener() {
             @Override public void onEvent(Event evt) {
@@ -1424,6 +1386,55 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
     }
 
     /**
+     *
+     */
+    private class DiscoveryListener implements GridLocalEventListener, 
HighPriorityListener {
+        /** {@inheritDoc} */
+        @Override public void onEvent(Event evt) {
+            assert evt instanceof DiscoveryEvent;
+            assert evt.type() == EVT_NODE_LEFT || evt.type() == 
EVT_NODE_FAILED;
+
+            UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
+
+            clientInfos.remove(nodeId);
+
+            // Unregister handlers created by left node.
+            for (Map.Entry<UUID, RemoteRoutineInfo> e : rmtInfos.entrySet()) {
+                UUID routineId = e.getKey();
+                RemoteRoutineInfo info = e.getValue();
+
+                if (nodeId.equals(info.nodeId)) {
+                    if (info.autoUnsubscribe)
+                        unregisterRemote(routineId);
+
+                    if (info.hnd.isQuery())
+                        info.hnd.onNodeLeft();
+                }
+            }
+
+            for (Map.Entry<IgniteUuid, SyncMessageAckFuture> e : 
syncMsgFuts.entrySet()) {
+                SyncMessageAckFuture fut = e.getValue();
+
+                if (fut.nodeId().equals(nodeId)) {
+                    SyncMessageAckFuture fut0 = syncMsgFuts.remove(e.getKey());
+
+                    if (fut0 != null) {
+                        ClusterTopologyCheckedException err = new 
ClusterTopologyCheckedException(
+                            "Node left grid while sending message to: " + 
nodeId);
+
+                        fut0.onDone(err);
+                    }
+                }
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public int order() {
+            return 1;
+        }
+    }
+
+    /**
      * Local routine info.
      */
     @SuppressWarnings("PackageVisibleInnerClass")

http://git-wip-us.apache.org/repos/asf/ignite/blob/b95f76f8/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 8cdb3c0..5aca2f9 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -64,6 +64,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
 import org.apache.ignite.internal.util.GridConcurrentFactory;
 import org.apache.ignite.internal.util.GridSpinReadWriteLock;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -1091,15 +1092,8 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter implements Communicati
     /** */
     private final ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> 
inRecDescs = GridConcurrentFactory.newMap();
 
-    /** Discovery listener. */
-    private final GridLocalEventListener discoLsnr = new 
GridLocalEventListener() {
-        @Override public void onEvent(Event evt) {
-            assert evt instanceof DiscoveryEvent : evt;
-            assert evt.type() == EVT_NODE_LEFT || evt.type() == 
EVT_NODE_FAILED ;
-
-            onNodeLeft(((DiscoveryEvent)evt).eventNode().id());
-        }
-    };
+    /** */
+    private final GridLocalEventListener discoLsnr = new DiscoveryListener();
 
     /**
      * @return {@code True} if ssl enabled.
@@ -3751,6 +3745,24 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter implements Communicati
     /**
      *
      */
+    private class DiscoveryListener implements GridLocalEventListener, 
HighPriorityListener {
+        /** {@inheritDoc} */
+        @Override public void onEvent(Event evt) {
+            assert evt instanceof DiscoveryEvent : evt;
+            assert evt.type() == EVT_NODE_LEFT || evt.type() == 
EVT_NODE_FAILED ;
+
+            onNodeLeft(((DiscoveryEvent)evt).eventNode().id());
+        }
+
+        /** {@inheritDoc} */
+        @Override public int order() {
+            return 0;
+        }
+    }
+
+    /**
+     *
+     */
     private class ShmemWorker extends GridWorker {
         /** */
         private final IpcEndpoint endpoint;

Reply via email to