Repository: ignite Updated Branches: refs/heads/ignite-1093-2 4baf249db -> 9dd41d5f1
1093 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9dd41d5f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9dd41d5f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9dd41d5f Branch: refs/heads/ignite-1093-2 Commit: 9dd41d5f18a54a66fb59dd520a855023d3e452de Parents: 4baf249 Author: Anton Vinogradov <[email protected]> Authored: Sun Oct 11 00:46:12 2015 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Sun Oct 11 00:46:12 2015 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 65 ++++++++++++---- .../processors/cache/GridCachePreloader.java | 5 +- .../cache/GridCachePreloaderAdapter.java | 6 +- .../processors/cache/GridCacheProcessor.java | 50 ------------ .../distributed/dht/GridDhtLocalPartition.java | 12 +-- .../distributed/dht/GridDhtPartitionState.java | 2 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 4 +- .../dht/preloader/GridDhtPartitionDemander.java | 81 ++++++-------------- .../dht/preloader/GridDhtPreloader.java | 7 +- .../GridCacheRebalancingSyncSelfTest.java | 12 ++- .../config/benchmark-rebalancing.properties | 2 - 11 files changed, 109 insertions(+), 137 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9dd41d5f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 2dbbe3c..8057d18 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -21,11 +21,17 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -74,6 +80,7 @@ import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.thread.IgniteThread; +import org.apache.ignite.thread.IgniteThreadFactory; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; @@ -134,6 +141,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** */ private GridFutureAdapter<?> reconnectExchangeFut; + /** */ + private ExecutorService rebalancingOrderedExecutorService; + /** * Partition map futures. * This set also contains already completed exchange futures to address race conditions when coordinator @@ -249,6 +259,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana @Override protected void start0() throws IgniteCheckedException { super.start0(); + rebalancingOrderedExecutorService = Executors.newSingleThreadExecutor( + new IgniteThreadFactory(cctx.gridName(), "rebalancing-assigns")); + exchWorker = new ExchangeWorker(); cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED, @@ -453,6 +466,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana @Override protected void stop0(boolean cancel) { super.stop0(cancel); + rebalancingOrderedExecutorService.shutdownNow(); + // Do not allow any activity in exchange manager after stop. busyLock.writeLock().lock(); @@ -1263,27 +1278,21 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } if (assignsMap != null) { + NavigableMap<Integer, List<Integer>> orderMap = new TreeMap<>(); + //Marshaller cache first. int mId = CU.cacheId(GridCacheUtils.MARSH_CACHE_NAME); - GridDhtPreloaderAssignments mA = assignsMap.get(mId); - - assert mA != null; + orderMap.put(-2, new ArrayList<Integer>(1)); - GridCacheContext<K, V> mCacheCtx = cctx.cacheContext(mId); - - mCacheCtx.preloader().addAssignments(mA, forcePreload); + orderMap.get(-2).add(mId); //Utility cache second. int uId = CU.cacheId(GridCacheUtils.UTILITY_CACHE_NAME); - GridDhtPreloaderAssignments uA = assignsMap.get(uId); - - assert uA != null; - - GridCacheContext<K, V> uCacheCtx = cctx.cacheContext(uId); + orderMap.put(-1, new ArrayList<Integer>(1)); - uCacheCtx.preloader().addAssignments(uA, forcePreload); + orderMap.get(-1).add(uId); //Others. for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : assignsMap.entrySet()) { @@ -1292,7 +1301,37 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (cacheId != uId && cacheId != mId) { GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); - cacheCtx.preloader().addAssignments(e.getValue(), forcePreload); + int order = cacheCtx.config().getRebalanceOrder(); + + if (orderMap.get(order) == null) + orderMap.put(order, new LinkedList<Integer>()); + + orderMap.get(order).add(cacheId); + } + } + + //Ordered rebalance scheduling. + for (Integer order : orderMap.keySet()) { + for (Integer cacheId : orderMap.get(order)) { + GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); + + List<String> waitList = new LinkedList<>(); + + for (List<Integer> cIds : orderMap.headMap(order).values()) { + for (Integer cId : cIds) { + waitList.add(cctx.cacheContext(cId).name()); + } + } + + Callable c = cacheCtx.preloader().addAssignments( + assignsMap.get(cacheId), forcePreload, waitList); + + if (c != null) { + U.log(log, "Rebalancing scheduled: [cache=" + cacheCtx.name() + + " , waitList=" + waitList.toString() + "]"); + + rebalancingOrderedExecutorService.submit(c); + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9dd41d5f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java index 622d01e..878f985 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache; import java.util.Collection; import java.util.UUID; +import java.util.concurrent.Callable; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -93,8 +94,10 @@ public interface GridCachePreloader { * * @param assignments Assignments to add. * @param forcePreload Force preload flag. + * @param caches Rebalancing of these caches will be finished before this started. */ - public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) throws IgniteCheckedException; + public Callable addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload, Collection<String> caches) + throws IgniteCheckedException; /** * @param p Preload predicate. http://git-wip-us.apache.org/repos/asf/ignite/blob/9dd41d5f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java index c5f503f..0aae0dc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache; import java.util.Collection; import java.util.UUID; +import java.util.concurrent.Callable; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.affinity.AffinityFunction; @@ -156,7 +157,8 @@ public class GridCachePreloaderAdapter implements GridCachePreloader { } /** {@inheritDoc} */ - @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) throws IgniteCheckedException { - // No-op. + @Override public Callable addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload, Collection<String> caches) + throws IgniteCheckedException { + return null; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9dd41d5f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 99b9269..9b2b558 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -31,9 +31,7 @@ import java.util.LinkedList; import java.util.List; import java.util.ListIterator; import java.util.Map; -import java.util.NavigableMap; import java.util.Set; -import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -99,7 +97,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManag import org.apache.ignite.internal.processors.plugin.CachePluginManager; import org.apache.ignite.internal.processors.query.GridQueryProcessor; import org.apache.ignite.internal.util.F0; -import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -162,12 +159,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** Map of proxies. */ private final Map<String, IgniteCacheProxy<?, ?>> jCacheProxies; - /** Map of preload finish futures grouped by preload order. */ - private final NavigableMap<Integer, IgniteInternalFuture<?>> preloadFuts; - - /** Maximum detected rebalance order. */ - private int maxRebalanceOrder; - /** Caches stop sequence. */ private final Deque<String> stopSeq; @@ -209,7 +200,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { caches = new ConcurrentHashMap<>(); jCacheProxies = new ConcurrentHashMap<>(); - preloadFuts = new TreeMap<>(); stopSeq = new LinkedList<>(); } @@ -612,8 +602,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { "Deployment mode for cache is not CONTINUOUS or SHARED."); } - maxRebalanceOrder = validatePreloadOrder(ctx.config().getCacheConfiguration()); - ctx.discovery().setCustomEventListener(DynamicCacheChangeBatch.class, new CustomEventListener<DynamicCacheChangeBatch>() { @Override public void onCustomEvent(ClusterNode snd, @@ -844,31 +832,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers()) mgr.onKernalStart(false); - for (Map.Entry<String, GridCacheAdapter<?, ?>> e : caches.entrySet()) { - GridCacheAdapter cache = e.getValue(); - - if (maxRebalanceOrder > 0) { - CacheConfiguration cfg = cache.configuration(); - - int order = cfg.getRebalanceOrder(); - - if (order > 0 && order != maxRebalanceOrder && cfg.getCacheMode() != LOCAL) { - GridCompoundFuture fut = (GridCompoundFuture)preloadFuts.get(order); - - if (fut == null) { - fut = new GridCompoundFuture<>(); - - preloadFuts.put(order, fut); - } - - fut.add(cache.preloader().syncFuture()); - } - } - } - - for (IgniteInternalFuture<?> fut : preloadFuts.values()) - ((GridCompoundFuture<Object, Object>)fut).markInitialized(); - for (GridCacheAdapter<?, ?> cache : caches.values()) onKernalStart(cache); @@ -2770,19 +2733,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** - * Gets preload finish future for preload-ordered cache with given order. I.e. will get compound preload future - * with maximum order less than {@code order}. - * - * @param order Cache order. - * @return Compound preload future or {@code null} if order is minimal order found. - */ - @Nullable public IgniteInternalFuture<?> orderedPreloadFuture(int order) { - Map.Entry<Integer, IgniteInternalFuture<?>> entry = preloadFuts.lowerEntry(order); - - return entry == null ? null : entry.getValue(); - } - - /** * @param spaceName Space name. * @param keyBytes Key bytes. * @param valBytes Value bytes. http://git-wip-us.apache.org/repos/asf/ignite/blob/9dd41d5f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 9fea4f9..3a5ef5c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -154,7 +154,8 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, * @return {@code false} If such reservation already added. */ public boolean addReservation(GridDhtPartitionsReservation r) { - assert state.getReference() != EVICTED : "we can reserve only active partitions"; + assert state.getReference() != EVICTED && state.getReference() != EVICTING : + "we can reserve only active partitions"; assert state.getStamp() != 0 : "partition must be already reserved before adding group reservation"; return reservations.addIfAbsent(r); @@ -260,7 +261,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, void onAdded(GridDhtCacheEntry entry) { GridDhtPartitionState state = state(); - if (state == EVICTED) + if (state == EVICTED || state == EVICTING) throw new GridDhtInvalidPartitionException(id, "Adding entry to invalid partition [part=" + id + ']'); map.put(entry.key(), entry); @@ -389,7 +390,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, GridDhtPartitionState s = state.getReference(); - if (s == EVICTED) + if (s == EVICTED || s == EVICTING) return false; if (state.compareAndSet(s, s, reservations, reservations + 1)) @@ -568,9 +569,8 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, return true; } - else { - assert false : "expected EVICTING state"; - } + + assert false : "expected EVICTING state"; return false; } http://git-wip-us.apache.org/repos/asf/ignite/blob/9dd41d5f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionState.java index 4f6d59d..9849c0a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionState.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionState.java @@ -53,6 +53,6 @@ public enum GridDhtPartitionState { * @return {@code True} if state is active or owning. */ public boolean active() { - return this != EVICTED; + return this != EVICTED && this != EVICTING; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/9dd41d5f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 761bbb0..09ce270 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -1212,7 +1212,9 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter GridDhtPartitionState state = entry.context().topology().partitionState(n.id(), entry.cached().partition()); - if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.EVICTED) { + if (state != GridDhtPartitionState.OWNING && + state != GridDhtPartitionState.EVICTING && + state != GridDhtPartitionState.EVICTED) { CacheObject procVal = entry.entryProcessorCalculatedValue(); entry.op(procVal == null ? DELETE : UPDATE); http://git-wip-us.apache.org/repos/asf/ignite/blob/9dd41d5f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 4e274f3..e585eca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -53,7 +53,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager; -import org.apache.ignite.internal.processors.cache.GridCacheUtils; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; @@ -66,7 +65,6 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.T2; -import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -224,8 +222,10 @@ public class GridDhtPartitionDemander { try { SyncFuture wFut = (SyncFuture)cctx.kernalContext().cache().internalCache(name).preloader().syncFuture(); - if (!topologyChanged(fut)) - wFut.get(); + if (!topologyChanged(fut)) { + if (!wFut.get()) + fut.cancel(); + } else { fut.cancel(); } @@ -247,10 +247,11 @@ public class GridDhtPartitionDemander { /** * @param assigns Assignments. * @param force {@code True} if dummy reassign. + * @param caches Rebalancing of these caches will be finished before this started. * @throws IgniteCheckedException Exception */ - - void addAssignments(final GridDhtPreloaderAssignments assigns, boolean force) throws IgniteCheckedException { + Callable addAssignments(final GridDhtPreloaderAssignments assigns, boolean force, final Collection<String> caches) + throws IgniteCheckedException { if (log.isDebugEnabled()) log.debug("Adding partition assignments: " + assigns); @@ -261,9 +262,9 @@ public class GridDhtPartitionDemander { final SyncFuture oldFut = syncFut; - final SyncFuture fut = new SyncFuture(assigns, cctx, log, oldFut.isDummy(), ++updateSeq); + final SyncFuture fut = new SyncFuture(assigns, cctx, log, oldFut.isInitial(), ++updateSeq); - if (!oldFut.isDummy()) + if (!oldFut.isInitial()) oldFut.cancel(); else fut.listen(new CI1<IgniteInternalFuture<Boolean>>() { @@ -279,72 +280,36 @@ public class GridDhtPartitionDemander { fut.cancel(); - return; + return null; } if (assigns.isEmpty()) { fut.doneIfEmpty(); - return; + return null; } if (topologyChanged(fut)) { fut.cancel(); - return; + return null; } - cctx.closures().callLocalSafe(new Callable<Boolean>() { - @Override public Boolean call() { - if (!CU.isMarshallerCache(cctx.name())) { - waitForCacheRebalancing(GridCacheUtils.MARSH_CACHE_NAME, fut); - - if (!CU.isUtilityCache(cctx.name())) { - waitForCacheRebalancing(GridCacheUtils.UTILITY_CACHE_NAME, fut); - } - } - - int rebalanceOrder = cctx.config().getRebalanceOrder(); - - if (rebalanceOrder > 0) { - IgniteInternalFuture<?> oFut = cctx.kernalContext().cache().orderedPreloadFuture(rebalanceOrder); - - try { - if (oFut != null) { - if (log.isDebugEnabled()) - log.debug("Waiting for dependant caches rebalance [cacheName=" + cctx.name() + - ", rebalanceOrder=" + rebalanceOrder + ']'); - - if (!topologyChanged(fut)) - oFut.get(); - else { - fut.cancel(); - - return false; - } - } - } - catch (IgniteInterruptedCheckedException ignored) { - if (log.isDebugEnabled()) { - log.debug("Failed to wait for ordered rebalance future (grid is stopping): " + - "[cacheName=" + cctx.name() + ", rebalanceOrder=" + rebalanceOrder + ']'); - fut.cancel(); - - return false; - } - } - catch (IgniteCheckedException e) { - fut.cancel(); + return new Callable<Boolean>() { + @Override + public Boolean call() { + for (String c : caches) { + waitForCacheRebalancing(c, fut); - throw new Error("Ordered rebalance future should never fail: " + e.getMessage(), e); - } + if (fut.isDone()) + return false; } requestPartitions(fut, assigns); return true; } - }); + }; } else if (delay > 0) { GridTimeoutObject obj = lastTimeoutObj.get(); @@ -370,6 +335,8 @@ public class GridDhtPartitionDemander { cctx.time().addTimeoutObject(obj); } + + return null; } /** @@ -843,9 +810,9 @@ public class GridDhtPartitionDemander { } /** - * @return Is dummy (created at demander creation). + * @return Is initial (created at demander creation). */ - private boolean isDummy() { + private boolean isInitial() { return topVer == null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/9dd41d5f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 06f4522..fbc74ff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -396,9 +397,9 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } /** {@inheritDoc} */ - @Override public void addAssignments(GridDhtPreloaderAssignments assignments, - boolean forcePreload) throws IgniteCheckedException { - demander.addAssignments(assignments, forcePreload); + @Override public Callable addAssignments(GridDhtPreloaderAssignments assignments, + boolean forcePreload, Collection<String> caches) throws IgniteCheckedException { + return demander.addAssignments(assignments, forcePreload, caches); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/9dd41d5f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java index 028a8fc..b6d11c5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java @@ -88,6 +88,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { cachePCfg.setBackups(1); cachePCfg.setRebalanceBatchSize(1); cachePCfg.setRebalanceBatchesCount(1); + cachePCfg.setRebalanceOrder(2); CacheConfiguration<Integer, Integer> cachePCfg2 = new CacheConfiguration<>(); @@ -95,6 +96,8 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { cachePCfg2.setCacheMode(CacheMode.PARTITIONED); cachePCfg2.setRebalanceMode(CacheRebalanceMode.SYNC); cachePCfg2.setBackups(1); + cachePCfg2.setRebalanceOrder(2); + // cachePCfg2.setRebalanceDelay(5000); CacheConfiguration<Integer, Integer> cacheRCfg = new CacheConfiguration<>(); @@ -103,13 +106,13 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { cacheRCfg.setRebalanceMode(CacheRebalanceMode.SYNC); cacheRCfg.setRebalanceBatchSize(1); cacheRCfg.setRebalanceBatchesCount(Integer.MAX_VALUE); - cacheRCfg.setRebalanceDelay(5000); CacheConfiguration<Integer, Integer> cacheRCfg2 = new CacheConfiguration<>(); cacheRCfg2.setName(CACHE_NAME_DHT_REPLICATED_2); cacheRCfg2.setCacheMode(CacheMode.REPLICATED); cacheRCfg2.setRebalanceMode(CacheRebalanceMode.SYNC); + cacheRCfg2.setRebalanceOrder(4); iCfg.setCacheConfiguration(cachePCfg, cachePCfg2, cacheRCfg, cacheRCfg2); @@ -356,6 +359,12 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { U.sleep(10); } + waitForRebalancing(0, 5, 0); + waitForRebalancing(1, 5, 0); + waitForRebalancing(2, 5, 0); + waitForRebalancing(3, 5, 0); + waitForRebalancing(4, 5, 0); + awaitPartitionMapExchange(); //New cache should start rebalancing. @@ -409,6 +418,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { t2.join(); t3.join(); + waitForRebalancing(0, 5, 1); waitForRebalancing(1, 5, 1); waitForRebalancing(2, 5, 1); waitForRebalancing(3, 5, 1); http://git-wip-us.apache.org/repos/asf/ignite/blob/9dd41d5f/modules/yardstick/config/benchmark-rebalancing.properties ---------------------------------------------------------------------- diff --git a/modules/yardstick/config/benchmark-rebalancing.properties b/modules/yardstick/config/benchmark-rebalancing.properties index 604c7ac..796e49f 100644 --- a/modules/yardstick/config/benchmark-rebalancing.properties +++ b/modules/yardstick/config/benchmark-rebalancing.properties @@ -70,8 +70,6 @@ nodesNum=$((`echo ${SERVER_HOSTS} | tr ',' '\n' | wc -l` + `echo ${DRIVER_HOSTS} # Run configuration. CONFIGS="\ -cfg ${SCRIPT_DIR}/../config/ignite-rebalancing-multicast-config.xml -nn ${nodesNum} -b 1 -w 0 -d 140 -t 64 -sm PRIMARY_SYNC -dn IgniteRebalancePutGetBenchmark -sn IgniteNode -ds PutGet2 -cl -r 20000000 -cn rebalance2,\ --cfg ${SCRIPT_DIR}/../config/ignite-rebalancing-multicast-config.xml -nn ${nodesNum} -b 1 -w 0 -d 140 -t 64 -sm PRIMARY_SYNC -dn IgniteRebalancePutGetBenchmark -sn IgniteNode -ds PutGet2_1 -cl -r 20000000 -cn rebalance2,\ --cfg ${SCRIPT_DIR}/../config/ignite-rebalancing-multicast-config.xml -nn ${nodesNum} -b 1 -w 0 -d 140 -t 64 -sm PRIMARY_SYNC -dn IgniteRebalancePutGetBenchmark -sn IgniteNode -ds PutGet2_2 -cl -r 20000000 -cn rebalance2,\ " #-cfg ${SCRIPT_DIR}/../config/ignite-rebalancing-multicast-config.xml -nn ${nodesNum} -b 1 -w 0 -d 140 -t 64 -sm PRIMARY_SYNC -dn IgniteRebalancePutGetBenchmark -sn IgniteNode -ds PutGet1 -cl -r 20000000 -cn rebalance1,\ #-cfg ${SCRIPT_DIR}/../config/ignite-rebalancing-multicast-config.xml -nn ${nodesNum} -b 1 -w 0 -d 140 -t 64 -sm PRIMARY_SYNC -dn IgniteRebalancePutGetBenchmark -sn IgniteNode -ds PutGet2 -cl -r 20000000 -cn rebalance2,\
