ignite-1027 Fixed early rebalance sync future completion.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ad9e4db5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ad9e4db5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ad9e4db5 Branch: refs/heads/ignite-1.5.1 Commit: ad9e4db5b87b064d13db4f9251c25efd535fb9e8 Parents: 9b60c75 Author: sboikov <sboi...@gridgain.com> Authored: Thu Dec 3 10:45:30 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Dec 3 10:45:30 2015 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 17 ++-- .../processors/cache/GridCachePreloader.java | 11 ++- .../dht/preloader/GridDhtPartitionDemander.java | 41 ++++---- .../dht/preloader/GridDhtPreloader.java | 4 +- .../preloader/GridDhtPreloaderAssignments.java | 19 +++- .../dht/GridCacheDhtPreloadDelayedSelfTest.java | 37 +++++--- ...cingDelayedPartitionMapExchangeSelfTest.java | 9 +- .../GridCacheRebalancingAsyncSelfTest.java | 3 +- .../GridCacheRebalancingSyncCheckDataTest.java | 98 ++++++++++++++++++++ .../GridCacheRebalancingSyncSelfTest.java | 55 +++++------ ...eRebalancingUnmarshallingFailedSelfTest.java | 6 +- .../testsuites/IgniteCacheTestSuite3.java | 2 + 12 files changed, 223 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ad9e4db5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index b13a5af..a0f7f93 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1399,8 +1399,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana waitList.add(cctx.cacheContext(cId).name()); } - Callable<Boolean> r = cacheCtx.preloader().addAssignments( - assignsMap.get(cacheId), forcePreload, waitList, cnt); + Callable<Boolean> r = cacheCtx.preloader().addAssignments(assignsMap.get(cacheId), + forcePreload, + waitList, + cnt); if (r != null) { U.log(log, "Cache rebalancing scheduled: [cache=" + cacheCtx.name() + @@ -1425,7 +1427,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + ", node=" + exchFut.discoveryEvent().eventNode().id() + ']'); - if (marshR != null) + if (marshR != null) { try { marshR.call(); //Marshaller cache rebalancing launches in sync way. } @@ -1435,6 +1437,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana continue; } + } final GridFutureAdapter fut = new GridFutureAdapter(); @@ -1463,17 +1466,19 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana fut.onDone(); } } - }, /*system pool*/ true); + }, /*system pool*/true); } - else + else { U.log(log, "Skipping rebalancing (obsolete exchange ID) " + "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + ", node=" + exchFut.discoveryEvent().eventNode().id() + ']'); + } } - else + else { U.log(log, "Skipping rebalancing (nothing scheduled) " + "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + ", node=" + exchFut.discoveryEvent().eventNode().id() + ']'); + } } } catch (IgniteInterruptedCheckedException e) { http://git-wip-us.apache.org/repos/asf/ignite/blob/ad9e4db5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java index 8e1164b..c8fcb90 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java @@ -86,9 +86,9 @@ public interface GridCachePreloader { /** * @param exchFut Exchange future to assign. - * @return Assignments. + * @return Assignments or {@code null} if detected that there are pending exchanges. */ - public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut); + @Nullable public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut); /** * Adds assignments to preloader. @@ -97,9 +97,12 @@ public interface GridCachePreloader { * @param forcePreload Force preload flag. * @param caches Rebalancing of these caches will be finished before this started. * @param cnt Counter. + * @return Rebalancing closure. */ - public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload, - Collection<String> caches, int cnt); + public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments, + boolean forcePreload, + Collection<String> caches, + int cnt); /** * @param p Preload predicate. http://git-wip-us.apache.org/repos/asf/ignite/blob/ad9e4db5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index eb9e97f..ced0d10 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -251,6 +251,7 @@ public class GridDhtPartitionDemander { /** * @param name Cache name. * @param fut Future. + * @throws IgniteCheckedException If failed. */ private boolean waitForCacheRebalancing(String name, RebalanceFuture fut) throws IgniteCheckedException { if (log.isDebugEnabled()) @@ -283,7 +284,7 @@ public class GridDhtPartitionDemander { * @param force {@code True} if dummy reassign. * @param caches Rebalancing of these caches will be finished before this started. * @param cnt Counter. - * @throws IgniteCheckedException If failed. + * @return Rebalancing closure. */ Callable<Boolean> addAssignments(final GridDhtPreloaderAssignments assigns, boolean force, final Collection<String> caches, int cnt) { @@ -293,25 +294,24 @@ public class GridDhtPartitionDemander { long delay = cctx.config().getRebalanceDelay(); if (delay == 0 || force) { - assert assigns != null; - final RebalanceFuture oldFut = rebalanceFut; final RebalanceFuture fut = new RebalanceFuture(assigns, cctx, log, oldFut.isInitial(), cnt); if (!oldFut.isInitial()) oldFut.cancel(); - else + else { fut.listen(new CI1<IgniteInternalFuture<Boolean>>() { - @Override public void apply(IgniteInternalFuture<Boolean> future) { + @Override public void apply(IgniteInternalFuture<Boolean> fut) { oldFut.onDone(fut.result()); } }); + } rebalanceFut = fut; if (assigns.isEmpty()) { - fut.doneIfEmpty(); + fut.doneIfEmpty(assigns.cancelled()); return null; } @@ -357,6 +357,9 @@ public class GridDhtPartitionDemander { /** * @param fut Future. + * @param assigns Assignments. + * @throws IgniteCheckedException If failed. + * @return */ private boolean requestPartitions( RebalanceFuture fut, @@ -370,7 +373,7 @@ public class GridDhtPartitionDemander { GridDhtPartitionDemandMessage d = e.getValue(); - fut.appendPartitions(node.id(), d.partitions());//Future preparation. + fut.appendPartitions(node.id(), d.partitions()); //Future preparation. } for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) { @@ -413,7 +416,8 @@ public class GridDhtPartitionDemander { initD.timeout(cctx.config().getRebalanceTimeout()); synchronized (fut) { - if (!fut.isDone())// Future can be already cancelled at this moment and all failovers happened. + if (!fut.isDone()) + // Future can be already cancelled at this moment and all failovers happened. // New requests will not be covered by failovers. cctx.io().sendOrderedMessage(node, rebalanceTopics.get(cnt), initD, cctx.ioPolicy(), initD.timeout()); @@ -427,9 +431,12 @@ public class GridDhtPartitionDemander { } } else { - U.log(log, "Starting rebalancing (old api) [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() + - ", fromNode=" + node.id() + ", partitionsCount=" + parts.size() + - ", topology=" + fut.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]"); + U.log(log, "Starting rebalancing (old api) [cache=" + cctx.name() + + ", mode=" + cfg.getRebalanceMode() + + ", fromNode=" + node.id() + + ", partitionsCount=" + parts.size() + + ", topology=" + fut.topologyVersion() + + ", updateSeq=" + fut.updateSeq + "]"); d.timeout(cctx.config().getRebalanceTimeout()); d.workerId(0);//old api support. @@ -832,9 +839,9 @@ public class GridDhtPartitionDemander { } /** - * + * @param cancelled Is cancelled. */ - private void doneIfEmpty() { + private void doneIfEmpty(boolean cancelled) { synchronized (this) { if (isDone()) return; @@ -845,14 +852,14 @@ public class GridDhtPartitionDemander { log.debug("Rebalancing is not required [cache=" + cctx.name() + ", topology=" + topVer + "]"); - checkIsDone(); + checkIsDone(cancelled); } } /** * Cancels this future. * - * @return {@code true}. + * @return {@code True}. */ @Override public boolean cancel() { synchronized (this) { @@ -860,7 +867,7 @@ public class GridDhtPartitionDemander { return true; U.log(log, "Cancelled rebalancing from all nodes [cache=" + cctx.name() - + ", topology=" + topologyVersion()); + + ", topology=" + topologyVersion() + ']'); if (!cctx.kernalContext().isStopping()) { for (UUID nodeId : remaining.keySet()) @@ -1012,7 +1019,7 @@ public class GridDhtPartitionDemander { preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent()); if (log.isDebugEnabled()) - log.debug("Completed rebalance future."); + log.debug("Completed rebalance future: " + this); cctx.shared().exchange().scheduleResendPartitions(); http://git-wip-us.apache.org/repos/asf/ignite/blob/ad9e4db5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 3e3cee3..9a6246f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -324,7 +324,9 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { log.debug("Skipping assignments creation, exchange worker has pending assignments: " + exchFut.exchangeId()); - break; + assigns.cancelled(true); + + return assigns; } // If partition belongs to local node. http://git-wip-us.apache.org/repos/asf/ignite/blob/ad9e4db5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java index 3583967..3f82c9b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java @@ -37,19 +37,36 @@ public class GridDhtPreloaderAssignments extends ConcurrentHashMap<ClusterNode, /** Last join order. */ private final AffinityTopologyVersion topVer; + /** */ + private boolean cancelled; + /** * @param exchFut Exchange future. * @param topVer Last join order. */ public GridDhtPreloaderAssignments(GridDhtPartitionsExchangeFuture exchFut, AffinityTopologyVersion topVer) { assert exchFut != null; - assert topVer.topologyVersion() > 0; + assert topVer.topologyVersion() > 0 : topVer; this.exchFut = exchFut; this.topVer = topVer; } /** + * @return {@code True} if assignments creation was cancelled. + */ + public boolean cancelled() { + return cancelled; + } + + /** + * @param cancelled {@code True} if assignments creation was cancelled. + */ + public void cancelled(boolean cancelled) { + this.cancelled = cancelled; + } + + /** * @return Exchange future. */ GridDhtPartitionsExchangeFuture exchangeFuture() { http://git-wip-us.apache.org/repos/asf/ignite/blob/ad9e4db5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDelayedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDelayedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDelayedSelfTest.java index 9d6e82f..0b610f3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDelayedSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDelayedSelfTest.java @@ -22,7 +22,6 @@ import java.util.Collection; import java.util.Map; import java.util.UUID; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CachePeekMode; @@ -35,7 +34,6 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.Event; import org.apache.ignite.events.EventType; -import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2; @@ -51,6 +49,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC; @@ -107,7 +106,9 @@ public class GridCacheDhtPreloadDelayedSelfTest extends GridCommonAbstractTest { stopAllGrids(); } - /** @throws Exception If failed. */ + /** + * @throws Exception If failed. + */ public void testManualPreload() throws Exception { delay = -1; @@ -184,7 +185,9 @@ public class GridCacheDhtPreloadDelayedSelfTest extends GridCommonAbstractTest { checkCache(c2, cnt); } - /** @throws Exception If failed. */ + /** + * @throws Exception If failed. + */ public void testDelayedPreload() throws Exception { delay = PRELOAD_DELAY; @@ -238,9 +241,9 @@ public class GridCacheDhtPreloadDelayedSelfTest extends GridCommonAbstractTest { checkMaps(false, d0, d1, d2); - assert l1.await(PRELOAD_DELAY * 3 / 2, TimeUnit.MILLISECONDS); + assert l1.await(PRELOAD_DELAY * 3 / 2, MILLISECONDS); - assert l2.await(PRELOAD_DELAY * 3 / 2, TimeUnit.MILLISECONDS); + assert l2.await(PRELOAD_DELAY * 3 / 2, MILLISECONDS); U.sleep(1000); @@ -253,7 +256,9 @@ public class GridCacheDhtPreloadDelayedSelfTest extends GridCommonAbstractTest { checkCache(c2, cnt); } - /** @throws Exception If failed. */ + /** + * @throws Exception If failed. + */ public void testAutomaticPreload() throws Exception { delay = 0; preloadMode = CacheRebalanceMode.SYNC; @@ -284,7 +289,9 @@ public class GridCacheDhtPreloadDelayedSelfTest extends GridCommonAbstractTest { checkCache(c2, cnt); } - /** @throws Exception If failed. */ + /** + * @throws Exception If failed. + */ public void testAutomaticPreloadWithEmptyCache() throws Exception { preloadMode = SYNC; @@ -331,7 +338,9 @@ public class GridCacheDhtPreloadDelayedSelfTest extends GridCommonAbstractTest { } } - /** @throws Exception If failed. */ + /** + * @throws Exception If failed. + */ public void testManualPreloadSyncMode() throws Exception { preloadMode = CacheRebalanceMode.SYNC; delay = -1; @@ -344,7 +353,9 @@ public class GridCacheDhtPreloadDelayedSelfTest extends GridCommonAbstractTest { } } - /** @throws Exception If failed. */ + /** + * @throws Exception If failed. + */ public void testPreloadManyNodes() throws Exception { delay = 0; preloadMode = ASYNC; @@ -419,9 +430,11 @@ public class GridCacheDhtPreloadDelayedSelfTest extends GridCommonAbstractTest { * * @param strict Strict check flag. * @param caches Maps to compare. + * @throws Exception If failed. */ - private void checkMaps(final boolean strict, final GridDhtCacheAdapter<String, Integer>... caches) - throws IgniteInterruptedCheckedException { + @SafeVarargs + private final void checkMaps(final boolean strict, final GridDhtCacheAdapter<String, Integer>... caches) + throws Exception { if (caches.length < 2) return; http://git-wip-us.apache.org/repos/asf/ignite/blob/ad9e4db5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java index a1ea7ad..2890fcb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java @@ -73,20 +73,20 @@ public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends Gri public class DelayableCommunicationSpi extends TcpCommunicationSpi { /** {@inheritDoc} */ @Override public void sendMessage(final ClusterNode node, final Message msg, - final IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException { + final IgniteInClosure<IgniteException> ackC) throws IgniteSpiException { final Object msg0 = ((GridIoMessage)msg).message(); if (msg0 instanceof GridDhtPartitionsFullMessage && record && ((GridDhtPartitionsFullMessage)msg0).exchangeId() == null) { rs.putIfAbsent(node.id(), new Runnable() { @Override public void run() { - DelayableCommunicationSpi.super.sendMessage(node, msg, ackClosure); + DelayableCommunicationSpi.super.sendMessage(node, msg, ackC); } }); } else try { - super.sendMessage(node, msg, ackClosure); + super.sendMessage(node, msg, ackC); } catch (Exception e) { U.log(null, e); @@ -144,9 +144,8 @@ public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends Gri awaitPartitionMapExchange(); - for (Runnable r : rs.values()) { + for (Runnable r : rs.values()) r.run(); - } U.sleep(10000); // Enough time to process delayed GridDhtPartitionsFullMessages. http://git-wip-us.apache.org/repos/asf/ignite/blob/ad9e4db5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java index 7759c70..bcda0da 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java @@ -33,9 +33,8 @@ public class GridCacheRebalancingAsyncSelfTest extends GridCacheRebalancingSyncS @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration iCfg = super.getConfiguration(gridName); - for (CacheConfiguration cacheCfg : iCfg.getCacheConfiguration()) { + for (CacheConfiguration cacheCfg : iCfg.getCacheConfiguration()) cacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC); - } return iCfg; } http://git-wip-us.apache.org/repos/asf/ignite/blob/ad9e4db5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncCheckDataTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncCheckDataTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncCheckDataTest.java new file mode 100644 index 0000000..5e4a5c4 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncCheckDataTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.rebalancing; + +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; + +/** + * + */ +public class GridCacheRebalancingSyncCheckDataTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); + ccfg.setCacheMode(REPLICATED); + ccfg.setRebalanceMode(SYNC); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testDataRebalancing() throws Exception { + Ignite ignite = startGrid(0); + + final int KEYS = 10_000; + + IgniteCache<Object, Object> cache = ignite.cache(null); + + for (int i = 0; i < KEYS; i++) + cache.put(i, i); + + + for (int i = 0; i < 3; i++) { + log.info("Iteration: " + i); + + final AtomicInteger idx = new AtomicInteger(1); + + GridTestUtils.runMultiThreaded(new Callable<Void>() { + @Override public Void call() throws Exception { + try(Ignite ignite = startGrid(idx.getAndIncrement())) { + IgniteCache<Object, Object> cache = ignite.cache(null); + + for (int i = 0; i < KEYS; i++) + assertNotNull(cache.localPeek(i)); + } + + return null; + } + }, 5, "start-node"); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/ad9e4db5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java index 8c5cd40..3b25bd7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java @@ -45,19 +45,19 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); /** */ - private static int TEST_SIZE = 100_000; + private static final int TEST_SIZE = 100_000; /** partitioned cache name. */ - protected static String CACHE_NAME_DHT_PARTITIONED = "cacheP"; + protected static final String CACHE_NAME_DHT_PARTITIONED = "cacheP"; /** partitioned cache 2 name. */ - protected static String CACHE_NAME_DHT_PARTITIONED_2 = "cacheP2"; + protected static final String CACHE_NAME_DHT_PARTITIONED_2 = "cacheP2"; /** replicated cache name. */ - protected static String CACHE_NAME_DHT_REPLICATED = "cacheR"; + protected static final String CACHE_NAME_DHT_REPLICATED = "cacheR"; /** replicated cache 2 name. */ - protected static String CACHE_NAME_DHT_REPLICATED_2 = "cacheR2"; + protected static final String CACHE_NAME_DHT_REPLICATED_2 = "cacheR2"; /** */ private volatile boolean concurrentStartFinished; @@ -122,6 +122,8 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { /** * @param ignite Ignite. + * @param from Start from key. + * @param iter Iteration. */ protected void generateData(Ignite ignite, int from, int iter) { generateData(ignite, CACHE_NAME_DHT_PARTITIONED, from, iter); @@ -132,6 +134,9 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { /** * @param ignite Ignite. + * @param name Cache name. + * @param from Start from key. + * @param iter Iteration. */ protected void generateData(Ignite ignite, String name, int from, int iter) { for (int i = from; i < from + TEST_SIZE; i++) { @@ -144,9 +149,10 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { /** * @param ignite Ignite. - * @throws IgniteCheckedException Exception. + * @param from Start from key. + * @param iter Iteration. */ - protected void checkData(Ignite ignite, int from, int iter) throws IgniteCheckedException { + protected void checkData(Ignite ignite, int from, int iter) { checkData(ignite, CACHE_NAME_DHT_PARTITIONED, from, iter); checkData(ignite, CACHE_NAME_DHT_PARTITIONED_2, from, iter); checkData(ignite, CACHE_NAME_DHT_REPLICATED, from, iter); @@ -155,10 +161,11 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { /** * @param ignite Ignite. + * @param from Start from key. + * @param iter Iteration. * @param name Cache name. - * @throws IgniteCheckedException Exception. */ - protected void checkData(Ignite ignite, String name, int from, int iter) throws IgniteCheckedException { + protected void checkData(Ignite ignite, String name, int from, int iter) { for (int i = from; i < from + TEST_SIZE; i++) { if (i % (TEST_SIZE / 10) == 0) log.info("<" + name + "> Checked " + i * 100 / (TEST_SIZE) + "% entries (" + TEST_SIZE + ")."); @@ -169,7 +176,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { } /** - * @throws Exception Exception + * @throws Exception If failed. */ public void testSimpleRebalancing() throws Exception { Ignite ignite = startGrid(0); @@ -206,7 +213,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { } /** - * @throws Exception Exception + * @throws Exception If failed. */ public void testLoadRebalancing() throws Exception { final Ignite ignite = startGrid(0); @@ -240,14 +247,8 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { Thread t2 = new Thread() { @Override public void run() { - while (!concurrentStartFinished) { - try { - checkData(ignite, CACHE_NAME_DHT_PARTITIONED, 0, 0); - } - catch (IgniteCheckedException e) { - e.printStackTrace(); - } - } + while (!concurrentStartFinished) + checkData(ignite, CACHE_NAME_DHT_PARTITIONED, 0, 0); } }; @@ -282,7 +283,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { * @param id Node id. * @param major Major ver. * @param minor Minor ver. - * @throws IgniteCheckedException Exception. + * @throws IgniteCheckedException If failed. */ protected void waitForRebalancing(int id, int major, int minor) throws IgniteCheckedException { waitForRebalancing(id, new AffinityTopologyVersion(major, minor)); @@ -291,7 +292,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { /** * @param id Node id. * @param major Major ver. - * @throws IgniteCheckedException Exception. + * @throws IgniteCheckedException If failed. */ protected void waitForRebalancing(int id, int major) throws IgniteCheckedException { waitForRebalancing(id, new AffinityTopologyVersion(major)); @@ -300,7 +301,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { /** * @param id Node id. * @param top Topology version. - * @throws IgniteCheckedException + * @throws IgniteCheckedException If failed. */ protected void waitForRebalancing(int id, AffinityTopologyVersion top) throws IgniteCheckedException { boolean finished = false; @@ -327,6 +328,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { /** * */ + @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") protected void checkSupplyContextMapIsEmpty() { for (Ignite g : G.allGrids()) { for (GridCacheAdapter c : ((IgniteEx)g).context().cache().internalCaches()) { @@ -342,12 +344,13 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { } } + /** {@inheritDoc} */ @Override protected long getTestTimeout() { return 5 * 60_000; } /** - * @throws Exception + * @throws Exception If failed. */ public void testComplexRebalancing() throws Exception { final Ignite ignite = startGrid(0); @@ -368,9 +371,8 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { startGrid(1); startGrid(2); - while (!concurrentStartFinished2) { + while (!concurrentStartFinished2) U.sleep(10); - } waitForRebalancing(0, 5, 0); waitForRebalancing(1, 5, 0); @@ -387,9 +389,8 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { grid(0).getOrCreateCache(cacheRCfg); - while (!concurrentStartFinished3) { + while (!concurrentStartFinished3) U.sleep(10); - } concurrentStartFinished = true; } http://git-wip-us.apache.org/repos/asf/ignite/blob/ad9e4db5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java index 506f1c2..7e35906 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java @@ -125,9 +125,8 @@ public class GridCacheRebalancingUnmarshallingFailedSelfTest extends GridCommonA startGrid(0); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 100; i++) grid(0).cache(CACHE).put(new TestKey(String.valueOf(i)), i); - } readCnt.set(1); @@ -135,9 +134,8 @@ public class GridCacheRebalancingUnmarshallingFailedSelfTest extends GridCommonA readCnt.set(Integer.MAX_VALUE); - for (int i = 0; i < 50; i++) { + for (int i = 0; i < 50; i++) assert grid(1).cache(CACHE).get(new TestKey(String.valueOf(i))) != null; - } stopGrid(0); http://git-wip-us.apache.org/repos/asf/ignite/blob/ad9e4db5/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java index b02d022..176ab3f 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java @@ -51,6 +51,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePut import org.apache.ignite.internal.processors.cache.distributed.near.IgniteTxReentryNearSelfTest; import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRabalancingDelayedPartitionMapExchangeSelfTest; import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingAsyncSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingSyncCheckDataTest; import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingSyncSelfTest; import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingUnmarshallingFailedSelfTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheDaemonNodeReplicatedSelfTest; @@ -140,6 +141,7 @@ public class IgniteCacheTestSuite3 extends TestSuite { suite.addTestSuite(GridCacheOrderedPreloadingSelfTest.class); suite.addTestSuite(GridCacheRebalancingSyncSelfTest.class); + suite.addTestSuite(GridCacheRebalancingSyncCheckDataTest.class); suite.addTestSuite(GridCacheRebalancingUnmarshallingFailedSelfTest.class); suite.addTestSuite(GridCacheRebalancingAsyncSelfTest.class); suite.addTestSuite(GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.class);