Repository: ignite Updated Branches: refs/heads/master 5c363184c -> b95f76f8a
ignite-5727 Call TcpCommunicationSpi's discovery listener first Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b95f76f8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b95f76f8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b95f76f8 Branch: refs/heads/master Commit: b95f76f8a0a3a7e920f78f20b3d814112fc6d522 Parents: 5c36318 Author: sboikov <sboi...@gridgain.com> Authored: Wed Jul 12 08:47:04 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Jul 12 08:47:04 2017 +0300 ---------------------------------------------------------------------- .../eventstorage/GridEventStorageManager.java | 309 ++++++++++--------- .../eventstorage/HighPriorityListener.java | 28 ++ .../processors/cache/GridCacheMvccManager.java | 5 - .../processors/cache/GridCacheProcessor.java | 2 - .../continuous/GridContinuousProcessor.java | 91 +++--- .../communication/tcp/TcpCommunicationSpi.java | 30 +- 6 files changed, 269 insertions(+), 196 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b95f76f8/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java index 1714cbb..944420f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.EventListener; import java.util.HashSet; import java.util.Iterator; @@ -83,10 +84,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUB */ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> { /** Local event listeners. */ - private final ConcurrentMap<Integer, Set<EventListener>> lsnrs = new ConcurrentHashMap8<>(); - - /** Internal discovery listeners. */ - private final ConcurrentMap<Integer, Set<DiscoveryEventListener>> discoLsnrs = new ConcurrentHashMap8<>(); + private final ConcurrentMap<Integer, Listeners> lsnrs = new ConcurrentHashMap8<>(); /** Busy lock to control activity of threads. */ private final ReadWriteLock busyLock = new ReentrantReadWriteLock(); @@ -208,8 +206,8 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> @Override public void printMemoryStats() { int lsnrsCnt = 0; - for (Set<EventListener> lsnrs0 : lsnrs.values()) - lsnrsCnt += lsnrs0.size(); + for (Listeners lsnrs0 : lsnrs.values()) + lsnrsCnt += lsnrs0.lsnrs.size(); X.println(">>>"); X.println(">>> Event storage manager memory stats [igniteInstanceName=" + ctx.igniteInstanceName() + ']'); @@ -250,9 +248,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> try { if (msgLsnr != null) - ctx.io().removeMessageListener( - TOPIC_EVENT, - msgLsnr); + ctx.io().removeMessageListener(TOPIC_EVENT, msgLsnr); msgLsnr = null; @@ -332,13 +328,14 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> } // Override user recordable settings for daemon node. - if ((isDaemon || isUserRecordable(type)) && !isHiddenEvent(type)) + if ((isDaemon || isUserRecordable(type)) && !isHiddenEvent(type)) { try { getSpi().record(evt); } catch (IgniteSpiException e) { U.error(log, "Failed to record event: " + evt, e); } + } if (isRecordable(type)) notifyListeners(lsnrs.get(evt.type()), evt, params); @@ -669,17 +666,13 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> * @param lsnr Listener to add. * @param types Event types to subscribe listener for. */ - private void addEventListener(EventListener lsnr, int[] types) { + private void addEventListener(ListenerWrapper lsnr, int[] types) { if (!enterBusy()) return; try { - for (int t : types) { - getOrCreate(lsnrs, t).add(lsnr); - - if (!isRecordable(t)) - U.warn(log, "Added listener for disabled event type: " + U.gridEventName(t)); - } + for (int t : types) + registerListener(lsnr, t); } finally { leaveBusy(); @@ -693,23 +686,16 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> * @param type Event type to subscribe listener for. * @param types Additional event types to subscribe listener for. */ - private void addEventListener(EventListener lsnr, int type, @Nullable int... types) { + private void addEventListener(ListenerWrapper lsnr, int type, @Nullable int... types) { if (!enterBusy()) return; try { - getOrCreate(lsnrs, type).add(lsnr); - - if (!isRecordable(type)) - U.warn(log, "Added listener for disabled event type: " + U.gridEventName(type)); + registerListener(lsnr, type); if (types != null) { - for (int t : types) { - getOrCreate(lsnrs, t).add(lsnr); - - if (!isRecordable(t)) - U.warn(log, "Added listener for disabled event type: " + U.gridEventName(t)); - } + for (int t : types) + registerListener(lsnr, t); } } finally { @@ -718,25 +704,25 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> } /** - * @param lsnrs Listeners map. + * @param lsnr Listener. * @param type Event type. - * @return Listeners for given event type. */ - private <T> Collection<T> getOrCreate(ConcurrentMap<Integer, Set<T>> lsnrs, Integer type) { - Set<T> set = lsnrs.get(type); + private void registerListener(ListenerWrapper lsnr, Integer type) { + Listeners lsnrs0 = lsnrs.get(type); - if (set == null) { - set = new GridConcurrentLinkedHashSet<>(); + if (lsnrs0 == null) { + lsnrs0 = new Listeners(); - Set<T> prev = lsnrs.putIfAbsent(type, set); + Listeners prev = lsnrs.putIfAbsent(type, lsnrs0); if (prev != null) - set = prev; + lsnrs0 = prev; } - assert set != null; + lsnrs0.addListener(lsnr); - return set; + if (!isRecordable(type)) + U.warn(log, "Added listener for disabled event type: " + U.gridEventName(type)); } /** @@ -789,29 +775,29 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> * @param types Event types. * @return Returns {@code true} if removed. */ - private boolean removeEventListener(EventListener lsnr, @Nullable int[] types) { + private boolean removeEventListener(ListenerWrapper lsnr, @Nullable int[] types) { assert lsnr != null; boolean found = false; if (F.isEmpty(types)) { - for (Set<EventListener> set : lsnrs.values()) - if (set.remove(lsnr)) + for (Listeners set : lsnrs.values()) { + if (set.removeListener(lsnr)) found = true; + } } else { assert types != null; for (int type : types) { - Set<EventListener> set = lsnrs.get(type); + Listeners set = lsnrs.get(type); - if (set != null && set.remove(lsnr)) + if (set != null && set.removeListener(lsnr)) found = true; } } - if (lsnr instanceof UserListenerWrapper) - { + if (lsnr instanceof UserListenerWrapper) { IgnitePredicate p = ((UserListenerWrapper)lsnr).listener(); if (p instanceof PlatformEventFilterListener) @@ -845,96 +831,38 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> } /** - * - * @param timeout Timeout. - * @param c Optional continuation. - * @param p Optional predicate. - * @param types Event types to wait for. - * @return Event. - * @throws IgniteCheckedException Thrown in case of any errors. - */ - public Event waitForEvent(long timeout, @Nullable Runnable c, - @Nullable final IgnitePredicate<? super Event> p, int... types) throws IgniteCheckedException { - assert timeout >= 0; - - final GridFutureAdapter<Event> fut = new GridFutureAdapter<>(); - - addLocalEventListener(new GridLocalEventListener() { - @Override public void onEvent(Event evt) { - if (p == null || p.apply(evt)) { - fut.onDone(evt); - - removeLocalEventListener(this); - } - } - }, types); - - try { - if (c != null) - c.run(); - } - catch (Exception e) { - throw new IgniteCheckedException(e); - } - - return fut.get(timeout); - } - - /** - * @param set Set of listeners. + * @param lsnrs Set of listeners. * @param evt Grid event. + * @param params Event parameters. */ - private void notifyListeners(@Nullable Collection<EventListener> set, Event evt, Object[] params) { + private void notifyListeners(@Nullable Listeners lsnrs, Event evt, Object[] params) { assert evt != null; - if (!F.isEmpty(set)) { - assert set != null; - - for (EventListener lsnr : set) { - try { - ((ListenerWrapper)lsnr).onEvent(evt, params); - } - catch (Throwable e) { - U.error(log, "Unexpected exception in listener notification for event: " + evt, e); + if (lsnrs != null) { + notifyListeners(lsnrs.highPriorityLsnrs, evt, params); - if (e instanceof Error) - throw (Error)e; - } - } + notifyListeners(lsnrs.lsnrs, evt, params); } } /** - * @param evt Discovery event - * @param cache Discovery cache. + * @param lsnrs Listeners collection. + * @param evt Event. + * @param params Event parameters. */ - private void notifyDiscoveryListeners(DiscoveryEvent evt, DiscoCache cache) { - assert evt != null; - - notifyDiscoveryListeners(discoLsnrs.get(evt.type()), evt, cache); - } - - /** - * @param set Set of listeners. - * @param evt Discovery event. - * @param cache Discovery cache. - */ - private void notifyDiscoveryListeners(@Nullable Collection<DiscoveryEventListener> set, DiscoveryEvent evt, DiscoCache cache) { - assert evt != null; - - if (!F.isEmpty(set)) { - assert set != null; + private void notifyListeners(@Nullable Collection<ListenerWrapper> lsnrs, Event evt, Object[] params) { + if (lsnrs == null || lsnrs.isEmpty()) + return; - for (DiscoveryEventListener lsnr : set) { - try { - lsnr.onEvent(evt, cache); - } - catch (Throwable e) { - U.error(log, "Unexpected exception in listener notification for event: " + evt, e); + for (EventListener lsnr : lsnrs) { + try { + ((ListenerWrapper)lsnr).onEvent(evt, params); + } + catch (Throwable e) { + U.error(log, "Unexpected exception in listener notification for event: " + evt, e); - if (e instanceof Error) - throw (Error)e; - } + if (e instanceof Error) + throw (Error)e; } } } @@ -1208,16 +1136,6 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> } /** - * @param arr Array. - * @return Array copy. - */ - private boolean[] copy(boolean[] arr) { - assert arr != null; - - return Arrays.copyOf(arr, arr.length); - } - - /** * */ private class RequestListener implements GridMessageListener { @@ -1329,9 +1247,98 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> } } - /** */ + /** + * + */ + static class Listeners { + /** */ + static Comparator<ListenerWrapper> ORDERED_CMP = new Comparator<ListenerWrapper>() { + @Override public int compare(ListenerWrapper lsnr1, ListenerWrapper lsnr2) { + int o1 = ((HighPriorityListener)lsnr1.listener()).order(); + int o2 = ((HighPriorityListener)lsnr2.listener()).order(); + + return Integer.compare(o1, o2); + } + }; + + /** */ + private volatile List<ListenerWrapper> highPriorityLsnrs; + + /** */ + private final Set<ListenerWrapper> lsnrs = new GridConcurrentLinkedHashSet<>(); + + /** + * @param lsnr Listener to add. + */ + void addListener(ListenerWrapper lsnr) { + if (lsnr.highPriority()) { + synchronized (this) { + List<ListenerWrapper> curLsnrs = highPriorityLsnrs; + List<ListenerWrapper> newLsnrs = new ArrayList<>(); + + if (curLsnrs != null) + newLsnrs.addAll(curLsnrs); + + assert !newLsnrs.contains(lsnr) : lsnr; + + newLsnrs.add(lsnr); + + Collections.sort(newLsnrs, ORDERED_CMP); + + highPriorityLsnrs = newLsnrs; + } + } + else + lsnrs.add(lsnr); + } + + /** + * @param lsnr Listener to remove. + * @return {@code True} + */ + boolean removeListener(ListenerWrapper lsnr) { + if (lsnr.highPriority()) { + synchronized (this) { + List<ListenerWrapper> curLsnrs = highPriorityLsnrs; + + if (curLsnrs == null) + return false; + + List<ListenerWrapper> newLsnrs = new ArrayList<>(curLsnrs); + + if (newLsnrs.remove(lsnr)) { + highPriorityLsnrs = newLsnrs; + + return true; + } + + return false; + } + } + else + return lsnrs.remove(lsnr); + } + } + + /** + * + */ private abstract static class ListenerWrapper implements EventListener { + /** + * @param evt Event. + * @param params Parameters. + */ abstract void onEvent(Event evt, Object[] params); + + /** + * @return Wrapped listener. + */ + abstract Object listener(); + + /** + * @return {@code True} if high priority listener. + */ + abstract boolean highPriority(); } /** @@ -1349,6 +1356,16 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> } /** {@inheritDoc} */ + @Override EventListener listener() { + return lsnr; + } + + /** {@inheritDoc} */ + @Override boolean highPriority() { + return lsnr instanceof HighPriorityListener; + } + + /** {@inheritDoc} */ @Override void onEvent(Event evt, Object[] params) { lsnr.onEvent(evt); } @@ -1387,8 +1404,18 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> } /** {@inheritDoc} */ + @Override EventListener listener() { + return lsnr; + } + + /** {@inheritDoc} */ + @Override boolean highPriority() { + return lsnr instanceof HighPriorityListener; + } + + /** {@inheritDoc} */ @Override void onEvent(Event evt, Object[] params) { - // No checks there since only DiscoveryManager produses DiscoveryEvents + // No checks there since only DiscoveryManager produces DiscoveryEvents // and it uses an overloaded method with additional parameters lsnr.onEvent((DiscoveryEvent)evt, (DiscoCache)params[0]); } @@ -1426,10 +1453,8 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> this.lsnr = (IgnitePredicate<Event>)lsnr; } - /** - * @return User listener. - */ - private IgnitePredicate<? extends Event> listener() { + /** {@inheritDoc} */ + public IgnitePredicate<? extends Event> listener() { return lsnr; } @@ -1450,12 +1475,16 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> UserListenerWrapper that = (UserListenerWrapper)o; return lsnr.equals(that.lsnr); - } /** {@inheritDoc} */ @Override public int hashCode() { return lsnr.hashCode(); } + + /** {@inheritDoc} */ + @Override boolean highPriority() { + return false; + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b95f76f8/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/HighPriorityListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/HighPriorityListener.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/HighPriorityListener.java new file mode 100644 index 0000000..c55aa8d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/HighPriorityListener.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.eventstorage; + +/** + * Marker interface for listeners called before 'regular' listeners. + */ +public interface HighPriorityListener { + /** + * @return Order. + */ + public int order(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b95f76f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index a6907b9..b156708 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -272,12 +272,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { exchLog = cctx.logger(getClass().getName() + ".exchange"); pendingExplicit = GridConcurrentFactory.newMap(); - } - /** - * Cache futures listener must be registered after communication listener. - */ - public void registerEventListener() { cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, EVT_NODE_LEFT); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b95f76f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 0488a14..9cedac6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -838,8 +838,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { ctx.query().onCacheKernalStart(); - sharedCtx.mvcc().registerEventListener(); - sharedCtx.exchange().onKernalStart(active, false); } finally { http://git-wip-us.apache.org/repos/asf/ignite/blob/b95f76f8/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 8b9b277..7062353 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -55,6 +55,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; import org.apache.ignite.internal.managers.discovery.CustomEventListener; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; +import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener; import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; @@ -161,46 +162,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { marsh = ctx.config().getMarshaller(); - ctx.event().addLocalEventListener(new GridLocalEventListener() { - @SuppressWarnings({"fallthrough", "TooBroadScope"}) - @Override public void onEvent(Event evt) { - assert evt instanceof DiscoveryEvent; - assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED; - - UUID nodeId = ((DiscoveryEvent)evt).eventNode().id(); - - clientInfos.remove(nodeId); - - // Unregister handlers created by left node. - for (Map.Entry<UUID, RemoteRoutineInfo> e : rmtInfos.entrySet()) { - UUID routineId = e.getKey(); - RemoteRoutineInfo info = e.getValue(); - - if (nodeId.equals(info.nodeId)) { - if (info.autoUnsubscribe) - unregisterRemote(routineId); - - if (info.hnd.isQuery()) - info.hnd.onNodeLeft(); - } - } - - for (Map.Entry<IgniteUuid, SyncMessageAckFuture> e : syncMsgFuts.entrySet()) { - SyncMessageAckFuture fut = e.getValue(); - - if (fut.nodeId().equals(nodeId)) { - SyncMessageAckFuture fut0 = syncMsgFuts.remove(e.getKey()); - - if (fut0 != null) { - ClusterTopologyCheckedException err = new ClusterTopologyCheckedException( - "Node left grid while sending message to: " + nodeId); - - fut0.onDone(err); - } - } - } - } - }, EVT_NODE_LEFT, EVT_NODE_FAILED); + ctx.event().addLocalEventListener(new DiscoveryListener(), EVT_NODE_LEFT, EVT_NODE_FAILED); ctx.event().addLocalEventListener(new GridLocalEventListener() { @Override public void onEvent(Event evt) { @@ -1424,6 +1386,55 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } /** + * + */ + private class DiscoveryListener implements GridLocalEventListener, HighPriorityListener { + /** {@inheritDoc} */ + @Override public void onEvent(Event evt) { + assert evt instanceof DiscoveryEvent; + assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED; + + UUID nodeId = ((DiscoveryEvent)evt).eventNode().id(); + + clientInfos.remove(nodeId); + + // Unregister handlers created by left node. + for (Map.Entry<UUID, RemoteRoutineInfo> e : rmtInfos.entrySet()) { + UUID routineId = e.getKey(); + RemoteRoutineInfo info = e.getValue(); + + if (nodeId.equals(info.nodeId)) { + if (info.autoUnsubscribe) + unregisterRemote(routineId); + + if (info.hnd.isQuery()) + info.hnd.onNodeLeft(); + } + } + + for (Map.Entry<IgniteUuid, SyncMessageAckFuture> e : syncMsgFuts.entrySet()) { + SyncMessageAckFuture fut = e.getValue(); + + if (fut.nodeId().equals(nodeId)) { + SyncMessageAckFuture fut0 = syncMsgFuts.remove(e.getKey()); + + if (fut0 != null) { + ClusterTopologyCheckedException err = new ClusterTopologyCheckedException( + "Node left grid while sending message to: " + nodeId); + + fut0.onDone(err); + } + } + } + } + + /** {@inheritDoc} */ + @Override public int order() { + return 1; + } + } + + /** * Local routine info. */ @SuppressWarnings("PackageVisibleInnerClass") http://git-wip-us.apache.org/repos/asf/ignite/blob/b95f76f8/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 8cdb3c0..5aca2f9 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -64,6 +64,7 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; +import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener; import org.apache.ignite.internal.util.GridConcurrentFactory; import org.apache.ignite.internal.util.GridSpinReadWriteLock; import org.apache.ignite.internal.util.future.GridFinishedFuture; @@ -1091,15 +1092,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati /** */ private final ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> inRecDescs = GridConcurrentFactory.newMap(); - /** Discovery listener. */ - private final GridLocalEventListener discoLsnr = new GridLocalEventListener() { - @Override public void onEvent(Event evt) { - assert evt instanceof DiscoveryEvent : evt; - assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ; - - onNodeLeft(((DiscoveryEvent)evt).eventNode().id()); - } - }; + /** */ + private final GridLocalEventListener discoLsnr = new DiscoveryListener(); /** * @return {@code True} if ssl enabled. @@ -3751,6 +3745,24 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati /** * */ + private class DiscoveryListener implements GridLocalEventListener, HighPriorityListener { + /** {@inheritDoc} */ + @Override public void onEvent(Event evt) { + assert evt instanceof DiscoveryEvent : evt; + assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ; + + onNodeLeft(((DiscoveryEvent)evt).eventNode().id()); + } + + /** {@inheritDoc} */ + @Override public int order() { + return 0; + } + } + + /** + * + */ private class ShmemWorker extends GridWorker { /** */ private final IpcEndpoint endpoint;