Repository: ignite Updated Branches: refs/heads/ignite-3195 [created] 1bde2bdc9
IGNITE-3195 Rebalancing: IgniteConfiguration.rebalanceThreadPoolSize is wrongly treated Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1bde2bdc Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1bde2bdc Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1bde2bdc Branch: refs/heads/ignite-3195 Commit: 1bde2bdc92c7da12c570d3c478d7767d046c2ef3 Parents: c9d08d3 Author: Anton Vinogradov <[email protected]> Authored: Tue Apr 11 15:59:44 2017 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Tue Apr 11 15:59:44 2017 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 75 +++++++++++++++----- 1 file changed, 56 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1bde2bdc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 885106d..6b38f55 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.LinkedBlockingDeque; @@ -384,28 +385,64 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) { final int idx = cnt; + // rebQ & rebOwn provides guaranties (efficiently) of maximum threads amount can be used at rebalancing + // specified at IgniteConfiguration.getRebalanceThreadPoolSize() + final ConcurrentLinkedDeque<T2<GridCacheMessage, UUID>> rebQ = new ConcurrentLinkedDeque<>(); + final AtomicBoolean rebOwn = new AtomicBoolean(); + cctx.io().addOrderedHandler(rebalanceTopic(cnt), new CI2<UUID, GridCacheMessage>() { - @Override public void apply(final UUID id, final GridCacheMessage m) { - if (!enterBusy()) - return; - - try { - GridCacheContext cacheCtx = cctx.cacheContext(m.cacheId); - - if (cacheCtx != null) { - if (m instanceof GridDhtPartitionSupplyMessage) - cacheCtx.preloader().handleSupplyMessage( - idx, id, (GridDhtPartitionSupplyMessage)m); - else if (m instanceof GridDhtPartitionDemandMessage) - cacheCtx.preloader().handleDemandMessage( - idx, id, (GridDhtPartitionDemandMessage)m); - else - U.error(log, "Unsupported message type: " + m.getClass().getName()); + @Override public void apply(final UUID id0, final GridCacheMessage m0) { + rebQ.add(new T2<>(m0, id0)); + + if (!rebOwn.get() && rebOwn.compareAndSet(false, true)) { + boolean locked = true; + + while (locked || !rebQ.isEmpty()) { + if (!locked && !rebOwn.compareAndSet(false, true)) + return; + + try { + T2<GridCacheMessage, UUID> t = rebQ.poll(); + + if (t != null) { + GridCacheMessage m = t.get1(); + UUID id = t.get2(); + + if (!enterBusy()) + return; + + try { + GridCacheContext cacheCtx = cctx.cacheContext(m.cacheId); + + if (cacheCtx != null) { + if (m instanceof GridDhtPartitionSupplyMessage) + cacheCtx.preloader().handleSupplyMessage( + idx, id, (GridDhtPartitionSupplyMessage)m); + else if (m instanceof GridDhtPartitionDemandMessage) + cacheCtx.preloader().handleDemandMessage( + idx, id, (GridDhtPartitionDemandMessage)m); + else + U.error(log, "Unsupported message type: " + m.getClass().getName()); + } + } + finally { + leaveBusy(); + } + } + } + finally { + if (!rebQ.isEmpty()) + locked = true; + else { + boolean res = rebOwn.compareAndSet(true, false); + + assert res; + + locked = false; + } + } } } - finally { - leaveBusy(); - } } }); }
