Repository: ignite Updated Branches: refs/heads/ignite-1093-3 bc0423dfd -> 645f86a7f
1093 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/645f86a7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/645f86a7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/645f86a7 Branch: refs/heads/ignite-1093-3 Commit: 645f86a7f95c3711fe93c89662a0e3baf2cf7e42 Parents: bc0423d Author: Anton Vinogradov <[email protected]> Authored: Mon Nov 2 20:21:09 2015 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Mon Nov 2 20:21:09 2015 +0300 ---------------------------------------------------------------------- .../dht/preloader/GridDhtPartitionDemander.java | 4 +- .../dht/preloader/GridDhtPreloader.java | 40 ++++++++++---------- 2 files changed, 24 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/645f86a7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 2c9b422..43248ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -145,6 +145,8 @@ public class GridDhtPartitionDemander { * Stop. */ void stop() { + rebalanceFut.cancel(); + lastExchangeFut = null; lastTimeoutObj.set(null); @@ -1005,7 +1007,7 @@ public class GridDhtPartitionDemander { if (!cancelled && !cctx.preloader().syncFuture().isDone()) ((GridFutureAdapter)cctx.preloader().syncFuture()).onDone(); - onDone(true); + onDone(!cancelled); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/645f86a7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index c3472b4..89e8f9d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -21,11 +21,10 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Queue; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.IgniteCheckedException; @@ -117,10 +116,10 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { new ConcurrentHashMap8<>(); /** */ - private final ConcurrentLinkedDeque8<GridDhtLocalPartition> partitionsToEvict = new ConcurrentLinkedDeque8<>(); + private final ConcurrentLinkedDeque8<GridDhtLocalPartition> partsToEvict = new ConcurrentLinkedDeque8<>(); /** */ - private final AtomicReference<Integer> partitionsEvictionOwning = new AtomicReference<>(0); + private final AtomicInteger partsEvictOwning = new AtomicInteger(); /** Discovery listener. */ private final GridLocalEventListener discoLsnr = new GridLocalEventListener() { @@ -771,34 +770,37 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** {@inheritDoc} */ @Override public void evictPartitionAsync(GridDhtLocalPartition part) { - partitionsToEvict.add(part); + partsToEvict.add(part); - if (partitionsEvictionOwning.get() == 0 && partitionsEvictionOwning.compareAndSet(0, 1)) { + if (partsEvictOwning.get() == 0 && partsEvictOwning.compareAndSet(0, 1)) { cctx.closures().callLocalSafe(new GPC<Boolean>() { @Override public Boolean call() { - boolean firstRun = true; + boolean locked = true; - while (true) { - if (!firstRun && !partitionsToEvict.isEmptyx() && - !partitionsEvictionOwning.compareAndSet(0, 1)) + while (locked || !partsToEvict.isEmptyx()) { + if (!locked && !partsEvictOwning.compareAndSet(0, 1)) return false; - firstRun = false; - try { - GridDhtLocalPartition part = partitionsToEvict.poll(); - - if (part == null) - return false; + GridDhtLocalPartition part = partsToEvict.poll(); - part.tryEvict(); + if (part != null) + part.tryEvict(); } finally { - boolean res = partitionsEvictionOwning.compareAndSet(1, 0); + if (!partsToEvict.isEmptyx()) + locked = true; + else { + boolean res = partsEvictOwning.compareAndSet(1, 0); - assert res; + assert res; + + locked = false; + } } } + + return true; } }, /*system pool*/ true); }
