Repository: ignite
Updated Branches:
  refs/heads/ignite-5727 560e1025e -> 30e692885


ignite-5727


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/30e69288
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/30e69288
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/30e69288

Branch: refs/heads/ignite-5727
Commit: 30e692885b38b2fc58b4115cbbdcf7a4ef4dd437
Parents: 560e102
Author: sboikov <sboi...@gridgain.com>
Authored: Tue Jul 11 13:53:05 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Tue Jul 11 13:53:05 2017 +0300

----------------------------------------------------------------------
 .../eventstorage/GridEventStorageManager.java   | 54 +++++++++---
 .../eventstorage/HighPriorityListener.java      |  5 +-
 .../continuous/GridContinuousProcessor.java     | 91 +++++++++++---------
 .../communication/tcp/TcpCommunicationSpi.java  |  6 ++
 4 files changed, 101 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/30e69288/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
index dd54b83..995525c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.EventListener;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -1251,6 +1252,16 @@ public class GridEventStorageManager extends 
GridManagerAdapter<EventStorageSpi>
      */
     static class Listeners {
         /** */
+        static Comparator<ListenerWrapper> ORDERED_CMP = new 
Comparator<ListenerWrapper>() {
+            @Override public int compare(ListenerWrapper lsnr1, 
ListenerWrapper lsnr2) {
+                int o1 = ((HighPriorityListener)lsnr1.listener()).order();
+                int o2 = ((HighPriorityListener)lsnr2.listener()).order();
+
+                return Integer.compare(o1, o2);
+            }
+        };
+
+        /** */
         private volatile List<ListenerWrapper> highPriorityLsnrs;
 
         /** */
@@ -1272,6 +1283,8 @@ public class GridEventStorageManager extends 
GridManagerAdapter<EventStorageSpi>
 
                     newLsnrs.add(lsnr);
 
+                    Collections.sort(newLsnrs, ORDERED_CMP);
+
                     highPriorityLsnrs = newLsnrs;
                 }
             }
@@ -1318,6 +1331,11 @@ public class GridEventStorageManager extends 
GridManagerAdapter<EventStorageSpi>
         abstract void onEvent(Event evt, Object[] params);
 
         /**
+         * @return Wrapped listener.
+         */
+        abstract Object listener();
+
+        /**
          * @return {@code True} if high priority listener.
          */
         abstract boolean highPriority();
@@ -1338,6 +1356,16 @@ public class GridEventStorageManager extends 
GridManagerAdapter<EventStorageSpi>
         }
 
         /** {@inheritDoc} */
+        @Override EventListener listener() {
+            return lsnr;
+        }
+
+        /** {@inheritDoc} */
+        @Override boolean highPriority() {
+            return lsnr instanceof HighPriorityListener;
+        }
+
+        /** {@inheritDoc} */
         @Override void onEvent(Event evt, Object[] params) {
             lsnr.onEvent(evt);
         }
@@ -1359,11 +1387,6 @@ public class GridEventStorageManager extends 
GridManagerAdapter<EventStorageSpi>
         @Override public int hashCode() {
             return lsnr.hashCode();
         }
-
-        /** {@inheritDoc} */
-        @Override boolean highPriority() {
-            return lsnr instanceof HighPriorityListener;
-        }
     }
 
     /**
@@ -1381,6 +1404,16 @@ public class GridEventStorageManager extends 
GridManagerAdapter<EventStorageSpi>
         }
 
         /** {@inheritDoc} */
+        @Override EventListener listener() {
+            return lsnr;
+        }
+
+        /** {@inheritDoc} */
+        @Override boolean highPriority() {
+            return lsnr instanceof HighPriorityListener;
+        }
+
+        /** {@inheritDoc} */
         @Override void onEvent(Event evt, Object[] params) {
             // No checks there since only DiscoveryManager produces 
DiscoveryEvents
             // and it uses an overloaded method with additional parameters
@@ -1404,11 +1437,6 @@ public class GridEventStorageManager extends 
GridManagerAdapter<EventStorageSpi>
         @Override public int hashCode() {
             return lsnr.hashCode();
         }
-
-        /** {@inheritDoc} */
-        @Override boolean highPriority() {
-            return lsnr instanceof HighPriorityListener;
-        }
     }
 
     /**
@@ -1425,10 +1453,8 @@ public class GridEventStorageManager extends 
GridManagerAdapter<EventStorageSpi>
             this.lsnr = (IgnitePredicate<Event>)lsnr;
         }
 
-        /**
-         * @return User listener.
-         */
-        private IgnitePredicate<? extends Event> listener() {
+        /** {@inheritDoc} */
+        public IgnitePredicate<? extends Event> listener() {
             return lsnr;
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/30e69288/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/HighPriorityListener.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/HighPriorityListener.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/HighPriorityListener.java
index a840f80..853bedb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/HighPriorityListener.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/HighPriorityListener.java
@@ -21,5 +21,8 @@ package org.apache.ignite.internal.managers.eventstorage;
  *
  */
 public interface HighPriorityListener {
-    // No-op.
+    /**
+     * @return Order.
+     */
+    public int order();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/30e69288/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 8b9b277..7062353 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -55,6 +55,7 @@ import 
org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
 import org.apache.ignite.internal.managers.discovery.CustomEventListener;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
@@ -161,46 +162,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
 
         marsh = ctx.config().getMarshaller();
 
-        ctx.event().addLocalEventListener(new GridLocalEventListener() {
-            @SuppressWarnings({"fallthrough", "TooBroadScope"})
-            @Override public void onEvent(Event evt) {
-                assert evt instanceof DiscoveryEvent;
-                assert evt.type() == EVT_NODE_LEFT || evt.type() == 
EVT_NODE_FAILED;
-
-                UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
-
-                clientInfos.remove(nodeId);
-
-                // Unregister handlers created by left node.
-                for (Map.Entry<UUID, RemoteRoutineInfo> e : 
rmtInfos.entrySet()) {
-                    UUID routineId = e.getKey();
-                    RemoteRoutineInfo info = e.getValue();
-
-                    if (nodeId.equals(info.nodeId)) {
-                        if (info.autoUnsubscribe)
-                            unregisterRemote(routineId);
-
-                        if (info.hnd.isQuery())
-                            info.hnd.onNodeLeft();
-                    }
-                }
-
-                for (Map.Entry<IgniteUuid, SyncMessageAckFuture> e : 
syncMsgFuts.entrySet()) {
-                    SyncMessageAckFuture fut = e.getValue();
-
-                    if (fut.nodeId().equals(nodeId)) {
-                        SyncMessageAckFuture fut0 = 
syncMsgFuts.remove(e.getKey());
-
-                        if (fut0 != null) {
-                            ClusterTopologyCheckedException err = new 
ClusterTopologyCheckedException(
-                                "Node left grid while sending message to: " + 
nodeId);
-
-                            fut0.onDone(err);
-                        }
-                    }
-                }
-            }
-        }, EVT_NODE_LEFT, EVT_NODE_FAILED);
+        ctx.event().addLocalEventListener(new DiscoveryListener(), 
EVT_NODE_LEFT, EVT_NODE_FAILED);
 
         ctx.event().addLocalEventListener(new GridLocalEventListener() {
             @Override public void onEvent(Event evt) {
@@ -1424,6 +1386,55 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
     }
 
     /**
+     *
+     */
+    private class DiscoveryListener implements GridLocalEventListener, 
HighPriorityListener {
+        /** {@inheritDoc} */
+        @Override public void onEvent(Event evt) {
+            assert evt instanceof DiscoveryEvent;
+            assert evt.type() == EVT_NODE_LEFT || evt.type() == 
EVT_NODE_FAILED;
+
+            UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
+
+            clientInfos.remove(nodeId);
+
+            // Unregister handlers created by left node.
+            for (Map.Entry<UUID, RemoteRoutineInfo> e : rmtInfos.entrySet()) {
+                UUID routineId = e.getKey();
+                RemoteRoutineInfo info = e.getValue();
+
+                if (nodeId.equals(info.nodeId)) {
+                    if (info.autoUnsubscribe)
+                        unregisterRemote(routineId);
+
+                    if (info.hnd.isQuery())
+                        info.hnd.onNodeLeft();
+                }
+            }
+
+            for (Map.Entry<IgniteUuid, SyncMessageAckFuture> e : 
syncMsgFuts.entrySet()) {
+                SyncMessageAckFuture fut = e.getValue();
+
+                if (fut.nodeId().equals(nodeId)) {
+                    SyncMessageAckFuture fut0 = syncMsgFuts.remove(e.getKey());
+
+                    if (fut0 != null) {
+                        ClusterTopologyCheckedException err = new 
ClusterTopologyCheckedException(
+                            "Node left grid while sending message to: " + 
nodeId);
+
+                        fut0.onDone(err);
+                    }
+                }
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public int order() {
+            return 1;
+        }
+    }
+
+    /**
      * Local routine info.
      */
     @SuppressWarnings("PackageVisibleInnerClass")

http://git-wip-us.apache.org/repos/asf/ignite/blob/30e69288/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 9c885ce..5aca2f9 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -3746,12 +3746,18 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter implements Communicati
      *
      */
     private class DiscoveryListener implements GridLocalEventListener, 
HighPriorityListener {
+        /** {@inheritDoc} */
         @Override public void onEvent(Event evt) {
             assert evt instanceof DiscoveryEvent : evt;
             assert evt.type() == EVT_NODE_LEFT || evt.type() == 
EVT_NODE_FAILED ;
 
             onNodeLeft(((DiscoveryEvent)evt).eventNode().id());
         }
+
+        /** {@inheritDoc} */
+        @Override public int order() {
+            return 0;
+        }
     }
 
     /**

Reply via email to