Repository: ignite Updated Branches: refs/heads/ignite-1093-2 0566a77b5 -> d5d78c0de
1093 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d5d78c0d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d5d78c0d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d5d78c0d Branch: refs/heads/ignite-1093-2 Commit: d5d78c0de68ec8be264510d70c842601e8a3dd89 Parents: 0566a77 Author: Anton Vinogradov <[email protected]> Authored: Sat Oct 17 11:49:52 2015 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Sat Oct 17 11:49:52 2015 +0300 ---------------------------------------------------------------------- .../dht/preloader/GridDhtPartitionDemander.java | 60 +++++++------------- 1 file changed, 22 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d5d78c0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 85649c4..d9f7b1c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -225,39 +225,27 @@ public class GridDhtPartitionDemander { * @param name Cache name. * @param fut Future. */ - private void waitForCacheRebalancing(String name, RebalanceFuture fut) { + private boolean waitForCacheRebalancing(String name, RebalanceFuture fut) throws IgniteCheckedException { if (log.isDebugEnabled()) log.debug("Waiting for " + name + " cache rebalancing [cacheName=" + cctx.name() + ']'); - try { - RebalanceFuture wFut = (RebalanceFuture)cctx.kernalContext().cache().internalCache(name).preloader().rebalanceFuture(); - - if (!topologyChanged(fut) && wFut.updateSeq == fut.updateSeq) { - if (!wFut.get()) { - U.log(log, "Skipping waiting of " + name + " cache [top=" + fut.topologyVersion() + - "] (cache rebalanced with missed partitions)"); + RebalanceFuture wFut = (RebalanceFuture)cctx.kernalContext().cache().internalCache(name).preloader().rebalanceFuture(); - fut.cancel(); - } - } - else { + if (!topologyChanged(fut) && wFut.updateSeq == fut.updateSeq) { + if (!wFut.get()) { U.log(log, "Skipping waiting of " + name + " cache [top=" + fut.topologyVersion() + - "] (topology already changed)"); + "] (cache rebalanced with missed partitions)"); - fut.cancel(); - } - } - catch (IgniteInterruptedCheckedException ignored) { - if (log.isDebugEnabled()) { - log.debug("Failed to wait for " + name + - "[cacheName=" + cctx.name() + ']'); - fut.cancel(); + return false; } + + return true; } - catch (IgniteCheckedException e) { - fut.cancel(); + else { + U.log(log, "Skipping waiting of " + name + " cache [top=" + fut.topologyVersion() + + "] (topology already changed)"); - throw new Error("Ordered rebalancing future should never fail: " + e.getMessage(), e); + return false; } } @@ -300,17 +288,13 @@ public class GridDhtPartitionDemander { } return new Callable<Boolean>() { - @Override public Boolean call() throws Exception{ + @Override public Boolean call() throws Exception { for (String c : caches) { - waitForCacheRebalancing(c, fut); - - if (fut.isDone()) + if (!waitForCacheRebalancing(c, fut)) return false; } - requestPartitions(fut, assigns); - - return true; + return requestPartitions(fut, assigns); } }; } @@ -345,13 +329,11 @@ public class GridDhtPartitionDemander { /** * @param fut Future. */ - private void requestPartitions(RebalanceFuture fut, GridDhtPreloaderAssignments assigns) throws IgniteCheckedException { + private boolean requestPartitions(RebalanceFuture fut, + GridDhtPreloaderAssignments assigns) throws IgniteCheckedException { for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) { - if (topologyChanged(fut)) { - fut.cancel(); - - return; - } + if (topologyChanged(fut)) + return false; final ClusterNode node = e.getKey(); @@ -418,6 +400,8 @@ public class GridDhtPartitionDemander { dw.run(node, d); } } + + return true; } /** @@ -1360,7 +1344,7 @@ public class GridDhtPartitionDemander { * @param node Node. * @param d D. */ - public void run(ClusterNode node, GridDhtPartitionDemandMessage d) throws IgniteCheckedException{ + public void run(ClusterNode node, GridDhtPartitionDemandMessage d) throws IgniteCheckedException { demandLock.readLock().lock(); try {
