Repository: ignite Updated Branches: refs/heads/ignite-5727 560e1025e -> 30e692885
ignite-5727 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/30e69288 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/30e69288 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/30e69288 Branch: refs/heads/ignite-5727 Commit: 30e692885b38b2fc58b4115cbbdcf7a4ef4dd437 Parents: 560e102 Author: sboikov <sboi...@gridgain.com> Authored: Tue Jul 11 13:53:05 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Jul 11 13:53:05 2017 +0300 ---------------------------------------------------------------------- .../eventstorage/GridEventStorageManager.java | 54 +++++++++--- .../eventstorage/HighPriorityListener.java | 5 +- .../continuous/GridContinuousProcessor.java | 91 +++++++++++--------- .../communication/tcp/TcpCommunicationSpi.java | 6 ++ 4 files changed, 101 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/30e69288/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java index dd54b83..995525c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.EventListener; import java.util.HashSet; import java.util.Iterator; @@ -1251,6 +1252,16 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> */ static class Listeners { /** */ + static Comparator<ListenerWrapper> ORDERED_CMP = new Comparator<ListenerWrapper>() { + @Override public int compare(ListenerWrapper lsnr1, ListenerWrapper lsnr2) { + int o1 = ((HighPriorityListener)lsnr1.listener()).order(); + int o2 = ((HighPriorityListener)lsnr2.listener()).order(); + + return Integer.compare(o1, o2); + } + }; + + /** */ private volatile List<ListenerWrapper> highPriorityLsnrs; /** */ @@ -1272,6 +1283,8 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> newLsnrs.add(lsnr); + Collections.sort(newLsnrs, ORDERED_CMP); + highPriorityLsnrs = newLsnrs; } } @@ -1318,6 +1331,11 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> abstract void onEvent(Event evt, Object[] params); /** + * @return Wrapped listener. + */ + abstract Object listener(); + + /** * @return {@code True} if high priority listener. */ abstract boolean highPriority(); @@ -1338,6 +1356,16 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> } /** {@inheritDoc} */ + @Override EventListener listener() { + return lsnr; + } + + /** {@inheritDoc} */ + @Override boolean highPriority() { + return lsnr instanceof HighPriorityListener; + } + + /** {@inheritDoc} */ @Override void onEvent(Event evt, Object[] params) { lsnr.onEvent(evt); } @@ -1359,11 +1387,6 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> @Override public int hashCode() { return lsnr.hashCode(); } - - /** {@inheritDoc} */ - @Override boolean highPriority() { - return lsnr instanceof HighPriorityListener; - } } /** @@ -1381,6 +1404,16 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> } /** {@inheritDoc} */ + @Override EventListener listener() { + return lsnr; + } + + /** {@inheritDoc} */ + @Override boolean highPriority() { + return lsnr instanceof HighPriorityListener; + } + + /** {@inheritDoc} */ @Override void onEvent(Event evt, Object[] params) { // No checks there since only DiscoveryManager produces DiscoveryEvents // and it uses an overloaded method with additional parameters @@ -1404,11 +1437,6 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> @Override public int hashCode() { return lsnr.hashCode(); } - - /** {@inheritDoc} */ - @Override boolean highPriority() { - return lsnr instanceof HighPriorityListener; - } } /** @@ -1425,10 +1453,8 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> this.lsnr = (IgnitePredicate<Event>)lsnr; } - /** - * @return User listener. - */ - private IgnitePredicate<? extends Event> listener() { + /** {@inheritDoc} */ + public IgnitePredicate<? extends Event> listener() { return lsnr; } http://git-wip-us.apache.org/repos/asf/ignite/blob/30e69288/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/HighPriorityListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/HighPriorityListener.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/HighPriorityListener.java index a840f80..853bedb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/HighPriorityListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/HighPriorityListener.java @@ -21,5 +21,8 @@ package org.apache.ignite.internal.managers.eventstorage; * */ public interface HighPriorityListener { - // No-op. + /** + * @return Order. + */ + public int order(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/30e69288/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 8b9b277..7062353 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -55,6 +55,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; import org.apache.ignite.internal.managers.discovery.CustomEventListener; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; +import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener; import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; @@ -161,46 +162,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { marsh = ctx.config().getMarshaller(); - ctx.event().addLocalEventListener(new GridLocalEventListener() { - @SuppressWarnings({"fallthrough", "TooBroadScope"}) - @Override public void onEvent(Event evt) { - assert evt instanceof DiscoveryEvent; - assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED; - - UUID nodeId = ((DiscoveryEvent)evt).eventNode().id(); - - clientInfos.remove(nodeId); - - // Unregister handlers created by left node. - for (Map.Entry<UUID, RemoteRoutineInfo> e : rmtInfos.entrySet()) { - UUID routineId = e.getKey(); - RemoteRoutineInfo info = e.getValue(); - - if (nodeId.equals(info.nodeId)) { - if (info.autoUnsubscribe) - unregisterRemote(routineId); - - if (info.hnd.isQuery()) - info.hnd.onNodeLeft(); - } - } - - for (Map.Entry<IgniteUuid, SyncMessageAckFuture> e : syncMsgFuts.entrySet()) { - SyncMessageAckFuture fut = e.getValue(); - - if (fut.nodeId().equals(nodeId)) { - SyncMessageAckFuture fut0 = syncMsgFuts.remove(e.getKey()); - - if (fut0 != null) { - ClusterTopologyCheckedException err = new ClusterTopologyCheckedException( - "Node left grid while sending message to: " + nodeId); - - fut0.onDone(err); - } - } - } - } - }, EVT_NODE_LEFT, EVT_NODE_FAILED); + ctx.event().addLocalEventListener(new DiscoveryListener(), EVT_NODE_LEFT, EVT_NODE_FAILED); ctx.event().addLocalEventListener(new GridLocalEventListener() { @Override public void onEvent(Event evt) { @@ -1424,6 +1386,55 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } /** + * + */ + private class DiscoveryListener implements GridLocalEventListener, HighPriorityListener { + /** {@inheritDoc} */ + @Override public void onEvent(Event evt) { + assert evt instanceof DiscoveryEvent; + assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED; + + UUID nodeId = ((DiscoveryEvent)evt).eventNode().id(); + + clientInfos.remove(nodeId); + + // Unregister handlers created by left node. + for (Map.Entry<UUID, RemoteRoutineInfo> e : rmtInfos.entrySet()) { + UUID routineId = e.getKey(); + RemoteRoutineInfo info = e.getValue(); + + if (nodeId.equals(info.nodeId)) { + if (info.autoUnsubscribe) + unregisterRemote(routineId); + + if (info.hnd.isQuery()) + info.hnd.onNodeLeft(); + } + } + + for (Map.Entry<IgniteUuid, SyncMessageAckFuture> e : syncMsgFuts.entrySet()) { + SyncMessageAckFuture fut = e.getValue(); + + if (fut.nodeId().equals(nodeId)) { + SyncMessageAckFuture fut0 = syncMsgFuts.remove(e.getKey()); + + if (fut0 != null) { + ClusterTopologyCheckedException err = new ClusterTopologyCheckedException( + "Node left grid while sending message to: " + nodeId); + + fut0.onDone(err); + } + } + } + } + + /** {@inheritDoc} */ + @Override public int order() { + return 1; + } + } + + /** * Local routine info. */ @SuppressWarnings("PackageVisibleInnerClass") http://git-wip-us.apache.org/repos/asf/ignite/blob/30e69288/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 9c885ce..5aca2f9 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -3746,12 +3746,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati * */ private class DiscoveryListener implements GridLocalEventListener, HighPriorityListener { + /** {@inheritDoc} */ @Override public void onEvent(Event evt) { assert evt instanceof DiscoveryEvent : evt; assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ; onNodeLeft(((DiscoveryEvent)evt).eventNode().id()); } + + /** {@inheritDoc} */ + @Override public int order() { + return 0; + } } /**