Repository: ignite Updated Branches: refs/heads/ignite-1093-2 93497acf7 -> 48397aef5
1093 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/48397aef Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/48397aef Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/48397aef Branch: refs/heads/ignite-1093-2 Commit: 48397aef5caba8058c4cf3cd0360bbcb505c538a Parents: 93497ac Author: Anton Vinogradov <[email protected]> Authored: Sun Oct 11 15:10:05 2015 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Sun Oct 11 15:10:05 2015 +0300 ---------------------------------------------------------------------- .../cache/GridCachePartitionExchangeManager.java | 6 +++++- .../internal/processors/cache/GridCachePreloader.java | 5 +++-- .../processors/cache/GridCachePreloaderAdapter.java | 9 +++++---- .../dht/preloader/GridDhtPartitionDemander.java | 14 ++++++-------- .../distributed/dht/preloader/GridDhtPreloader.java | 4 ++-- 5 files changed, 21 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/48397aef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 8057d18..e8c022f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1161,9 +1161,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana boolean startEvtFired = false; + int cnt = 0; + while (!isCancelled()) { GridDhtPartitionsExchangeFuture exchFut = null; + cnt++; + try { boolean preloadFinished = true; @@ -1324,7 +1328,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } Callable c = cacheCtx.preloader().addAssignments( - assignsMap.get(cacheId), forcePreload, waitList); + assignsMap.get(cacheId), forcePreload, waitList, cnt); if (c != null) { U.log(log, "Rebalancing scheduled: [cache=" + cacheCtx.name() + http://git-wip-us.apache.org/repos/asf/ignite/blob/48397aef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java index 878f985..46a09f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java @@ -95,9 +95,10 @@ public interface GridCachePreloader { * @param assignments Assignments to add. * @param forcePreload Force preload flag. * @param caches Rebalancing of these caches will be finished before this started. + * @param cnt Counter. */ - public Callable addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload, Collection<String> caches) - throws IgniteCheckedException; + public Callable addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload, + Collection<String> caches, int cnt) throws IgniteCheckedException; /** * @param p Preload predicate. http://git-wip-us.apache.org/repos/asf/ignite/blob/48397aef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java index 0aae0dc..1266a3b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java @@ -40,7 +40,7 @@ public class GridCachePreloaderAdapter implements GridCachePreloader { /** Cache context. */ protected final GridCacheContext<?, ?> cctx; - /** Logger.*/ + /** Logger. */ protected final IgniteLogger log; /** Affinity. */ @@ -132,7 +132,8 @@ public class GridCachePreloaderAdapter implements GridCachePreloader { } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Object> request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) { + @Override public IgniteInternalFuture<Object> request(Collection<KeyCacheObject> keys, + AffinityTopologyVersion topVer) { return new GridFinishedFuture<>(); } @@ -157,8 +158,8 @@ public class GridCachePreloaderAdapter implements GridCachePreloader { } /** {@inheritDoc} */ - @Override public Callable addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload, Collection<String> caches) - throws IgniteCheckedException { + @Override public Callable addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload, + Collection<String> caches, int cnt) throws IgniteCheckedException { return null; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/48397aef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index e585eca..f965c02 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -108,9 +108,6 @@ public class GridDhtPartitionDemander { /** Demand lock. */ private final ReadWriteLock demandLock; - /** Rebalancing iteration counter. */ - private long updateSeq = 0; - /** * @param cctx Cctx. * @param demandLock Demand lock. @@ -222,7 +219,7 @@ public class GridDhtPartitionDemander { try { SyncFuture wFut = (SyncFuture)cctx.kernalContext().cache().internalCache(name).preloader().syncFuture(); - if (!topologyChanged(fut)) { + if (!topologyChanged(fut) && wFut.updateSeq == fut.updateSeq) { if (!wFut.get()) fut.cancel(); } @@ -232,7 +229,7 @@ public class GridDhtPartitionDemander { } catch (IgniteInterruptedCheckedException ignored) { if (log.isDebugEnabled()) { - log.debug("Failed to wait for " + name + " cache rebalancing future (grid is stopping): " + + log.debug("Failed to wait for " + name + "[cacheName=" + cctx.name() + ']'); fut.cancel(); } @@ -248,10 +245,11 @@ public class GridDhtPartitionDemander { * @param assigns Assignments. * @param force {@code True} if dummy reassign. * @param caches Rebalancing of these caches will be finished before this started. + * @param cnt Counter. * @throws IgniteCheckedException Exception */ - Callable addAssignments(final GridDhtPreloaderAssignments assigns, boolean force, final Collection<String> caches) - throws IgniteCheckedException { + Callable addAssignments(final GridDhtPreloaderAssignments assigns, boolean force, final Collection<String> caches, + int cnt) throws IgniteCheckedException { if (log.isDebugEnabled()) log.debug("Adding partition assignments: " + assigns); @@ -262,7 +260,7 @@ public class GridDhtPartitionDemander { final SyncFuture oldFut = syncFut; - final SyncFuture fut = new SyncFuture(assigns, cctx, log, oldFut.isInitial(), ++updateSeq); + final SyncFuture fut = new SyncFuture(assigns, cctx, log, oldFut.isInitial(), cnt); if (!oldFut.isInitial()) oldFut.cancel(); http://git-wip-us.apache.org/repos/asf/ignite/blob/48397aef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index fbc74ff..b563c15 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -398,8 +398,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** {@inheritDoc} */ @Override public Callable addAssignments(GridDhtPreloaderAssignments assignments, - boolean forcePreload, Collection<String> caches) throws IgniteCheckedException { - return demander.addAssignments(assignments, forcePreload, caches); + boolean forcePreload, Collection<String> caches, int cnt) throws IgniteCheckedException { + return demander.addAssignments(assignments, forcePreload, caches, cnt); } /**
