5578
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/354b8682 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/354b8682 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/354b8682 Branch: refs/heads/ignite-5578 Commit: 354b8682d97b04c642abd377afd6591727ebb12b Parents: de050c7 Author: sboikov <[email protected]> Authored: Wed Aug 2 23:28:30 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Aug 2 23:28:30 2017 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 45 ++++++++++++++++++-- .../GridDhtPartitionsExchangeFuture.java | 6 ++- 2 files changed, 46 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/354b8682/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index c8235ac..e4508ce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -641,6 +641,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** {@inheritDoc} */ @Override protected void onKernalStop0(boolean cancel) { + exchWorker.onKernalStop(); + cctx.gridEvents().removeDiscoveryEventListener(discoLsnr); cctx.io().removeHandler(false, 0, GridDhtPartitionsSingleMessage.class); @@ -1767,11 +1769,18 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana this.exchMergeTestWaitVer = exchMergeTestWaitVer; } - public void mergeExchanges(final GridDhtPartitionsExchangeFuture curFut, GridDhtPartitionsFullMessage msg) + /** + * @param curFut Current exchange future. + * @param msg Message. + * @return {@code True} if node is stopping. + * @throws IgniteInterruptedCheckedException If interrupted. + */ + public boolean mergeExchanges(final GridDhtPartitionsExchangeFuture curFut, GridDhtPartitionsFullMessage msg) throws IgniteInterruptedCheckedException { AffinityTopologyVersion resVer = msg.resultTopologyVersion(); - exchWorker.waitForExchangeFuture(resVer); + if (exchWorker.waitForExchangeFuture(resVer)) + return true; for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) { if (task instanceof GridDhtPartitionsExchangeFuture) { @@ -1811,6 +1820,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana assert evts.topologyVersion().equals(resVer) : "Invalid exchange merge result [ver=" + evts.topologyVersion() + ", expVer=" + resVer + ']'; + + return false; } /** @@ -1944,6 +1955,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** */ private boolean crd; + /** */ + private boolean stop; + /** * Constructor. */ @@ -1989,13 +2003,36 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana log.debug("Added exchange future to exchange worker: " + exchFut); } - private void waitForExchangeFuture(AffinityTopologyVersion resVer) throws IgniteInterruptedCheckedException { + /** + * + */ + private void onKernalStop() { + synchronized (this) { + stop = true; + + notifyAll(); + } + } + + /** + * @param resVer Version to wait for. + * @return {@code True} if node is stopping. + * @throws IgniteInterruptedCheckedException If interrupted. + */ + private boolean waitForExchangeFuture(AffinityTopologyVersion resVer) throws IgniteInterruptedCheckedException { synchronized (this) { - while (lastFutVer.compareTo(resVer) < 0) + while (!stop && lastFutVer.compareTo(resVer) < 0) U.wait(this); + + return stop; } } + /** + * @param resVer Exchange result version. + * @param exchFut Exchange future. + * @throws IgniteInterruptedCheckedException If interrupted. + */ private void onExchangeDone(AffinityTopologyVersion resVer, GridDhtPartitionsExchangeFuture exchFut) throws IgniteInterruptedCheckedException { if (resVer.compareTo(exchFut.exchangeId().topologyVersion()) != 0) { http://git-wip-us.apache.org/repos/asf/ignite/blob/354b8682/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 6f77f96..5191557 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -2645,7 +2645,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte resTopVer = msg.resultTopologyVersion(); - cctx.exchange().mergeExchanges(this, msg); + if (cctx.exchange().mergeExchanges(this, msg)) { + assert cctx.kernalContext().isStopping(); + + return; // Node is stopping, no need to further process exchange. + } assert resTopVer.equals(exchCtx.events().topologyVersion()) : "Unexpected result version [" + "msgVer=" + resTopVer +
