http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 998c720..c634ff5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -17,15 +17,18 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteInternalFuture; @@ -47,27 +50,42 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.util.GridAtomicLong; import org.apache.ignite.internal.util.future.GridCompoundFuture; +import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.GPC; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; +import org.jsr166.ConcurrentLinkedDeque8; +import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING; import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap; /** * DHT cache preloader. */ public class GridDhtPreloader extends GridCachePreloaderAdapter { + /** + * Rebalancing was refactored at version 1.5.0, but backward compatibility to previous implementation was saved. + * Node automatically chose communication protocol depends on remote node's version. + * Backward compatibility may be removed at Ignite 2.x. + */ + public static final IgniteProductVersion REBALANCING_VER_2_SINCE = IgniteProductVersion.fromString("1.5.0"); + /** Default preload resend timeout. */ public static final long DFLT_PRELOAD_RESEND_TIMEOUT = 1500; @@ -81,10 +99,10 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<?, ?>> forceKeyFuts = newMap(); /** Partition suppliers. */ - private GridDhtPartitionSupplyPool supplyPool; + private GridDhtPartitionSupplier supplier; /** Partition demanders. */ - private GridDhtPartitionDemandPool demandPool; + private GridDhtPartitionDemander demander; /** Start future. */ private GridFutureAdapter<Object> startFut; @@ -92,10 +110,19 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** Busy lock to prevent activities from accessing exchanger while it's stopping. */ private final ReadWriteLock busyLock = new ReentrantReadWriteLock(); + /** Demand lock. */ + private final ReadWriteLock demandLock = new ReentrantReadWriteLock(); + /** Pending affinity assignment futures. */ private ConcurrentMap<AffinityTopologyVersion, GridDhtAssignmentFetchFuture> pendingAssignmentFetchFuts = new ConcurrentHashMap8<>(); + /** */ + private final ConcurrentLinkedDeque8<GridDhtLocalPartition> partsToEvict = new ConcurrentLinkedDeque8<>(); + + /** */ + private final AtomicInteger partsEvictOwning = new AtomicInteger(); + /** Discovery listener. */ private final GridLocalEventListener discoLsnr = new GridLocalEventListener() { @Override public void onEvent(Event evt) { @@ -179,8 +206,11 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } }); - supplyPool = new GridDhtPartitionSupplyPool(cctx, busyLock); - demandPool = new GridDhtPartitionDemandPool(cctx, busyLock); + supplier = new GridDhtPartitionSupplier(cctx); + demander = new GridDhtPartitionDemander(cctx, busyLock); + + supplier.start(); + demander.start(); cctx.events().addListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED); } @@ -197,19 +227,16 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { final long startTopVer = loc.order(); topVer.setIfGreater(startTopVer); - - supplyPool.start(); - demandPool.start(); } /** {@inheritDoc} */ @Override public void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) { super.preloadPredicate(preloadPred); - assert supplyPool != null && demandPool != null : "preloadPredicate may be called only after start()"; + assert supplier != null && demander != null : "preloadPredicate may be called only after start()"; - supplyPool.preloadPredicate(preloadPred); - demandPool.preloadPredicate(preloadPred); + supplier.preloadPredicate(preloadPred); + demander.preloadPredicate(preloadPred); } /** {@inheritDoc} */ @@ -223,37 +250,109 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { // Acquire write busy lock. busyLock.writeLock().lock(); - if (supplyPool != null) - supplyPool.stop(); + if (supplier != null) + supplier.stop(); - if (demandPool != null) - demandPool.stop(); + if (demander != null) + demander.stop(); top = null; } /** {@inheritDoc} */ @Override public void onInitialExchangeComplete(@Nullable Throwable err) { - if (err == null) { + if (err == null) startFut.onDone(); + else + startFut.onDone(err); + } + + /** {@inheritDoc} */ + @Override public void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) { + demander.updateLastExchangeFuture(lastFut); + } + + /** {@inheritDoc} */ + @Override public void onTopologyChanged(AffinityTopologyVersion topVer) { + supplier.onTopologyChanged(topVer); + } + + /** {@inheritDoc} */ + @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) { + // No assignments for disabled preloader. + GridDhtPartitionTopology top = cctx.dht().topology(); + + if (!cctx.rebalanceEnabled()) + return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion()); + + int partCnt = cctx.affinity().partitions(); - final long start = U.currentTimeMillis(); + assert exchFut.forcePreload() || exchFut.dummyReassign() || + exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) : + "Topology version mismatch [exchId=" + exchFut.exchangeId() + + ", topVer=" + top.topologyVersion() + ']'; - final CacheConfiguration cfg = cctx.config(); + GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion()); - if (cfg.getRebalanceDelay() >= 0 && !cctx.kernalContext().clientNode()) { - U.log(log, "Starting rebalancing in " + cfg.getRebalanceMode() + " mode: " + cctx.name()); + AffinityTopologyVersion topVer = assigns.topologyVersion(); - demandPool.syncFuture().listen(new CI1<Object>() { - @Override public void apply(Object t) { - U.log(log, "Completed rebalancing in " + cfg.getRebalanceMode() + " mode " + - "[cache=" + cctx.name() + ", time=" + (U.currentTimeMillis() - start) + " ms]"); + for (int p = 0; p < partCnt; p++) { + if (cctx.shared().exchange().hasPendingExchange()) { + if (log.isDebugEnabled()) + log.debug("Skipping assignments creation, exchange worker has pending assignments: " + + exchFut.exchangeId()); + + break; + } + + // If partition belongs to local node. + if (cctx.affinity().localNode(p, topVer)) { + GridDhtLocalPartition part = top.localPartition(p, topVer, true); + + assert part != null; + assert part.id() == p; + + if (part.state() != MOVING) { + if (log.isDebugEnabled()) + log.debug("Skipping partition assignment (state is not MOVING): " + part); + + continue; // For. + } + + Collection<ClusterNode> picked = pickedOwners(p, topVer); + + if (picked.isEmpty()) { + top.own(part); + + if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) { + DiscoveryEvent discoEvt = exchFut.discoveryEvent(); + + cctx.events().addPreloadEvent(p, + EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(), + discoEvt.type(), discoEvt.timestamp()); } - }); + + if (log.isDebugEnabled()) + log.debug("Owning partition as there are no other owners: " + part); + } + else { + ClusterNode n = F.rand(picked); + + GridDhtPartitionDemandMessage msg = assigns.get(n); + + if (msg == null) { + assigns.put(n, msg = new GridDhtPartitionDemandMessage( + top.updateSequence(), + exchFut.exchangeId().topologyVersion(), + cctx.cacheId())); + } + + msg.addPartition(p); + } } } - else - startFut.onDone(err); + + return assigns; } /** {@inheritDoc} */ @@ -267,24 +366,77 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { topVer.set(topVer0); } - /** {@inheritDoc} */ - @Override public void onExchangeFutureAdded() { - demandPool.onExchangeFutureAdded(); + /** + * @param p Partition. + * @param topVer Topology version. + * @return Picked owners. + */ + private Collection<ClusterNode> pickedOwners(int p, AffinityTopologyVersion topVer) { + Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer); + + int affCnt = affNodes.size(); + + Collection<ClusterNode> rmts = remoteOwners(p, topVer); + + int rmtCnt = rmts.size(); + + if (rmtCnt <= affCnt) + return rmts; + + List<ClusterNode> sorted = new ArrayList<>(rmts); + + // Sort in descending order, so nodes with higher order will be first. + Collections.sort(sorted, CU.nodeComparator(false)); + + // Pick newest nodes. + return sorted.subList(0, affCnt); + } + + /** + * @param p Partition. + * @param topVer Topology version. + * @return Nodes owning this partition. + */ + private Collection<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer) { + return F.view(cctx.dht().topology().owners(p, topVer), F.remoteNodes(cctx.nodeId())); } /** {@inheritDoc} */ - @Override public void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) { - demandPool.updateLastExchangeFuture(lastFut); + public void handleSupplyMessage(int idx, UUID id, final GridDhtPartitionSupplyMessageV2 s) { + if (!enterBusy()) + return; + + try { + demandLock.readLock().lock(); + try { + demander.handleSupplyMessage(idx, id, s); + } + finally { + demandLock.readLock().unlock(); + } + } + finally { + leaveBusy(); + } } /** {@inheritDoc} */ - @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) { - return demandPool.assign(exchFut); + public void handleDemandMessage(int idx, UUID id, GridDhtPartitionDemandMessage d) { + if (!enterBusy()) + return; + + try { + supplier.handleDemandMessage(idx, id, d); + } + finally { + leaveBusy(); + } } /** {@inheritDoc} */ - @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) { - demandPool.addAssignments(assignments, forcePreload); + @Override public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments, + boolean forcePreload, Collection<String> caches, int cnt) { + return demander.addAssignments(assignments, forcePreload, caches, cnt); } /** @@ -296,7 +448,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> syncFuture() { - return cctx.kernalContext().clientNode() ? startFut : demandPool.syncFuture(); + return cctx.kernalContext().clientNode() ? startFut : demander.syncFuture(); + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<Boolean> rebalanceFuture() { + return cctx.kernalContext().clientNode() ? new GridFinishedFuture<>(true) : demander.rebalanceFuture(); } /** @@ -580,12 +737,19 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** {@inheritDoc} */ @Override public void forcePreload() { - demandPool.forcePreload(); + demander.forcePreload(); } /** {@inheritDoc} */ @Override public void unwindUndeploys() { - demandPool.unwindUndeploys(); + demandLock.writeLock().lock(); + + try { + cctx.deploy().unwind(cctx); + } + finally { + demandLock.writeLock().unlock(); + } } /** @@ -607,6 +771,44 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } /** {@inheritDoc} */ + @Override public void evictPartitionAsync(GridDhtLocalPartition part) { + partsToEvict.add(part); + + if (partsEvictOwning.get() == 0 && partsEvictOwning.compareAndSet(0, 1)) { + cctx.closures().callLocalSafe(new GPC<Boolean>() { + @Override public Boolean call() { + boolean locked = true; + + while (locked || !partsToEvict.isEmptyx()) { + if (!locked && !partsEvictOwning.compareAndSet(0, 1)) + return false; + + try { + GridDhtLocalPartition part = partsToEvict.poll(); + + if (part != null) + part.tryEvict(); + } + finally { + if (!partsToEvict.isEmptyx()) + locked = true; + else { + boolean res = partsEvictOwning.compareAndSet(1, 0); + + assert res; + + locked = false; + } + } + } + + return true; + } + }, /*system pool*/ true); + } + } + + /** {@inheritDoc} */ @Override public void dumpDebugInfo() { if (!forceKeyFuts.isEmpty()) { U.warn(log, "Pending force key futures [cache=" + cctx.name() +"]:"); @@ -621,6 +823,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { for (GridDhtAssignmentFetchFuture fut : pendingAssignmentFetchFuts.values()) U.warn(log, ">>> " + fut); } + + supplier.dumpDebugInfo(); } /**
http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index 7c5e97c..810bd8c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -1292,6 +1292,9 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { catch (IgniteCheckedException e) { U.error(log, "Failed to remove count down latch: " + latch0.name(), e); } + finally { + ctx.cache().context().txContextReset(); + } } }); http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java index 26a41de..9315d7c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java @@ -696,7 +696,9 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { if (log.isDebugEnabled()) U.warn(log, "Received response for unknown child job (was job presumed failed?): " + res); - return; + selfOccupied = true; + + continue; } // Only process 1st response and ignore following ones. This scenario http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridTuple4.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridTuple4.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridTuple4.java index 835cdcb..c95a859 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridTuple4.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridTuple4.java @@ -239,7 +239,7 @@ public class GridTuple4<V1, V2, V3, V4> implements Iterable<Object>, Externaliza GridTuple4<?, ?, ?, ?> t = (GridTuple4<?, ?, ?, ?>)o; - return F.eq(val1, t.val2) && F.eq(val2, t.val2) && F.eq(val3, t.val3) && F.eq(val4, t.val4); + return F.eq(val1, t.val1) && F.eq(val2, t.val2) && F.eq(val3, t.val3) && F.eq(val4, t.val4); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 6254605..854ce95 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -1956,7 +1956,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T * <p> * This method is intended for test purposes only. */ - void simulateNodeFailure() { + protected void simulateNodeFailure() { impl.simulateNodeFailure(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java index 1b2b84d..f4423f7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java @@ -87,7 +87,7 @@ public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTes } /** Test key 1. */ - public static class TestKey implements Externalizable { + protected static class TestKey implements Externalizable { /** Field. */ @QuerySqlField(index = true) private String field; http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java index cadd03f..fe0b84e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java @@ -17,6 +17,11 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import javax.cache.CacheException; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; @@ -46,12 +51,6 @@ import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionRollbackException; -import javax.cache.CacheException; -import java.util.Collection; -import java.util.Collections; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; - import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; @@ -192,13 +191,13 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest { boolean backup, final boolean commit ) throws Exception { - startGrids(gridCount()); - awaitPartitionMapExchange(); + try { + startGrids(gridCount()); + awaitPartitionMapExchange(); - for (int i = 0; i < gridCount(); i++) - info("Grid " + i + ": " + ignite(i).cluster().localNode().id()); + for (int i = 0; i < gridCount(); i++) + info("Grid " + i + ": " + ignite(i).cluster().localNode().id()); - try { final Ignite ignite = ignite(0); final IgniteCache<Object, Object> cache = ignite.cache(null).withNoRetries(); http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java new file mode 100644 index 0000000..7759c70 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.rebalancing; + +import org.apache.ignite.Ignite; +import org.apache.ignite.cache.CacheRebalanceMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi; + +/** + * + */ +public class GridCacheRebalancingAsyncSelfTest extends GridCacheRebalancingSyncSelfTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration iCfg = super.getConfiguration(gridName); + + for (CacheConfiguration cacheCfg : iCfg.getCacheConfiguration()) { + cacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC); + } + + return iCfg; + } + + /** + * @throws Exception Exception. + */ + public void testNodeFailedAtRebalancing() throws Exception { + Ignite ignite = startGrid(0); + + generateData(ignite, 0, 0); + + log.info("Preloading started."); + + startGrid(1); + + GridDhtPartitionDemander.RebalanceFuture fut = (GridDhtPartitionDemander.RebalanceFuture)grid(1).context(). + cache().internalCache(CACHE_NAME_DHT_REPLICATED).preloader().rebalanceFuture(); + + fut.get(); + + U.sleep(10); + + ((TestTcpDiscoverySpi)grid(1).configuration().getDiscoverySpi()).simulateNodeFailure(); + + waitForRebalancing(0, 3); + + checkSupplyContextMapIsEmpty(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java new file mode 100644 index 0000000..8c5cd40 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java @@ -0,0 +1,506 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.rebalancing; + +import java.util.Map; +import java.util.Random; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheRebalanceMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheAdapter; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { + /** */ + protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static int TEST_SIZE = 100_000; + + /** partitioned cache name. */ + protected static String CACHE_NAME_DHT_PARTITIONED = "cacheP"; + + /** partitioned cache 2 name. */ + protected static String CACHE_NAME_DHT_PARTITIONED_2 = "cacheP2"; + + /** replicated cache name. */ + protected static String CACHE_NAME_DHT_REPLICATED = "cacheR"; + + /** replicated cache 2 name. */ + protected static String CACHE_NAME_DHT_REPLICATED_2 = "cacheR2"; + + /** */ + private volatile boolean concurrentStartFinished; + + /** */ + private volatile boolean concurrentStartFinished2; + + /** */ + private volatile boolean concurrentStartFinished3; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration iCfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder); + ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true); + + if (getTestGridName(10).equals(gridName)) + iCfg.setClientMode(true); + + CacheConfiguration<Integer, Integer> cachePCfg = new CacheConfiguration<>(); + + cachePCfg.setName(CACHE_NAME_DHT_PARTITIONED); + cachePCfg.setCacheMode(CacheMode.PARTITIONED); + cachePCfg.setRebalanceMode(CacheRebalanceMode.SYNC); + cachePCfg.setBackups(1); + cachePCfg.setRebalanceBatchSize(1); + cachePCfg.setRebalanceBatchesPrefetchCount(1); + cachePCfg.setRebalanceOrder(2); + + CacheConfiguration<Integer, Integer> cachePCfg2 = new CacheConfiguration<>(); + + cachePCfg2.setName(CACHE_NAME_DHT_PARTITIONED_2); + cachePCfg2.setCacheMode(CacheMode.PARTITIONED); + cachePCfg2.setRebalanceMode(CacheRebalanceMode.SYNC); + cachePCfg2.setBackups(1); + cachePCfg2.setRebalanceOrder(2); + //cachePCfg2.setRebalanceDelay(5000);//Known issue, possible deadlock in case of low priority cache rebalancing delayed. + + CacheConfiguration<Integer, Integer> cacheRCfg = new CacheConfiguration<>(); + + cacheRCfg.setName(CACHE_NAME_DHT_REPLICATED); + cacheRCfg.setCacheMode(CacheMode.REPLICATED); + cacheRCfg.setRebalanceMode(CacheRebalanceMode.SYNC); + cacheRCfg.setRebalanceBatchSize(1); + cacheRCfg.setRebalanceBatchesPrefetchCount(Integer.MAX_VALUE); + ((TcpCommunicationSpi)iCfg.getCommunicationSpi()).setSharedMemoryPort(-1);//Shmem fail fix for Integer.MAX_VALUE. + + CacheConfiguration<Integer, Integer> cacheRCfg2 = new CacheConfiguration<>(); + + cacheRCfg2.setName(CACHE_NAME_DHT_REPLICATED_2); + cacheRCfg2.setCacheMode(CacheMode.REPLICATED); + cacheRCfg2.setRebalanceMode(CacheRebalanceMode.SYNC); + cacheRCfg2.setRebalanceOrder(4); + + iCfg.setCacheConfiguration(cachePCfg, cachePCfg2, cacheRCfg, cacheRCfg2); + + iCfg.setRebalanceThreadPoolSize(2); + + return iCfg; + } + + /** + * @param ignite Ignite. + */ + protected void generateData(Ignite ignite, int from, int iter) { + generateData(ignite, CACHE_NAME_DHT_PARTITIONED, from, iter); + generateData(ignite, CACHE_NAME_DHT_PARTITIONED_2, from, iter); + generateData(ignite, CACHE_NAME_DHT_REPLICATED, from, iter); + generateData(ignite, CACHE_NAME_DHT_REPLICATED_2, from, iter); + } + + /** + * @param ignite Ignite. + */ + protected void generateData(Ignite ignite, String name, int from, int iter) { + for (int i = from; i < from + TEST_SIZE; i++) { + if (i % (TEST_SIZE / 10) == 0) + log.info("Prepared " + i * 100 / (TEST_SIZE) + "% entries (" + TEST_SIZE + ")."); + + ignite.cache(name).put(i, i + name.hashCode() + iter); + } + } + + /** + * @param ignite Ignite. + * @throws IgniteCheckedException Exception. + */ + protected void checkData(Ignite ignite, int from, int iter) throws IgniteCheckedException { + checkData(ignite, CACHE_NAME_DHT_PARTITIONED, from, iter); + checkData(ignite, CACHE_NAME_DHT_PARTITIONED_2, from, iter); + checkData(ignite, CACHE_NAME_DHT_REPLICATED, from, iter); + checkData(ignite, CACHE_NAME_DHT_REPLICATED_2, from, iter); + } + + /** + * @param ignite Ignite. + * @param name Cache name. + * @throws IgniteCheckedException Exception. + */ + protected void checkData(Ignite ignite, String name, int from, int iter) throws IgniteCheckedException { + for (int i = from; i < from + TEST_SIZE; i++) { + if (i % (TEST_SIZE / 10) == 0) + log.info("<" + name + "> Checked " + i * 100 / (TEST_SIZE) + "% entries (" + TEST_SIZE + ")."); + + assert ignite.cache(name).get(i) != null && ignite.cache(name).get(i).equals(i + name.hashCode() + iter) : + i + " value " + (i + name.hashCode() + iter) + " does not match (" + ignite.cache(name).get(i) + ")"; + } + } + + /** + * @throws Exception Exception + */ + public void testSimpleRebalancing() throws Exception { + Ignite ignite = startGrid(0); + + generateData(ignite, 0, 0); + + log.info("Preloading started."); + + long start = System.currentTimeMillis(); + + startGrid(1); + + waitForRebalancing(0, 2); + waitForRebalancing(1, 2); + + stopGrid(0); + + waitForRebalancing(1, 3); + + startGrid(2); + + waitForRebalancing(1, 4); + waitForRebalancing(2, 4); + + stopGrid(2); + + waitForRebalancing(1, 5); + + long spend = (System.currentTimeMillis() - start) / 1000; + + checkData(grid(1), 0, 0); + + log.info("Spend " + spend + " seconds to rebalance entries."); + } + + /** + * @throws Exception Exception + */ + public void testLoadRebalancing() throws Exception { + final Ignite ignite = startGrid(0); + + startGrid(1); + + generateData(ignite, CACHE_NAME_DHT_PARTITIONED, 0, 0); + + log.info("Preloading started."); + + long start = System.currentTimeMillis(); + + concurrentStartFinished = false; + + Thread t1 = new Thread() { + @Override public void run() { + Random rdm = new Random(); + + while (!concurrentStartFinished) { + for (int i = 0; i < TEST_SIZE; i++) { + if (i % (TEST_SIZE / 10) == 0) + log.info("Prepared " + i * 100 / (TEST_SIZE) + "% entries (" + TEST_SIZE + ")."); + + int ii = rdm.nextInt(TEST_SIZE); + + ignite.cache(CACHE_NAME_DHT_PARTITIONED).put(ii, ii + CACHE_NAME_DHT_PARTITIONED.hashCode()); + } + } + } + }; + + Thread t2 = new Thread() { + @Override public void run() { + while (!concurrentStartFinished) { + try { + checkData(ignite, CACHE_NAME_DHT_PARTITIONED, 0, 0); + } + catch (IgniteCheckedException e) { + e.printStackTrace(); + } + } + } + }; + + t1.start(); + t2.start(); + + startGrid(2); + startGrid(3); + + stopGrid(2); + + startGrid(4); + + waitForRebalancing(3, 6); + waitForRebalancing(4, 6); + + concurrentStartFinished = true; + + awaitPartitionMapExchange(true); + + checkSupplyContextMapIsEmpty(); + + t1.join(); + t2.join(); + + long spend = (System.currentTimeMillis() - start) / 1000; + + info("Time to rebalance entries: " + spend); + } + + /** + * @param id Node id. + * @param major Major ver. + * @param minor Minor ver. + * @throws IgniteCheckedException Exception. + */ + protected void waitForRebalancing(int id, int major, int minor) throws IgniteCheckedException { + waitForRebalancing(id, new AffinityTopologyVersion(major, minor)); + } + + /** + * @param id Node id. + * @param major Major ver. + * @throws IgniteCheckedException Exception. + */ + protected void waitForRebalancing(int id, int major) throws IgniteCheckedException { + waitForRebalancing(id, new AffinityTopologyVersion(major)); + } + + /** + * @param id Node id. + * @param top Topology version. + * @throws IgniteCheckedException + */ + protected void waitForRebalancing(int id, AffinityTopologyVersion top) throws IgniteCheckedException { + boolean finished = false; + + while (!finished) { + finished = true; + + for (GridCacheAdapter c : grid(id).context().cache().internalCaches()) { + GridDhtPartitionDemander.RebalanceFuture fut = (GridDhtPartitionDemander.RebalanceFuture)c.preloader().rebalanceFuture(); + if (fut.topologyVersion() == null || !fut.topologyVersion().equals(top)) { + finished = false; + + break; + } + else if (!fut.get()) { + finished = false; + + log.warning("Rebalancing finished with missed partitions."); + } + } + } + } + + /** + * + */ + protected void checkSupplyContextMapIsEmpty() { + for (Ignite g : G.allGrids()) { + for (GridCacheAdapter c : ((IgniteEx)g).context().cache().internalCaches()) { + + Object supplier = U.field(c.preloader(), "supplier"); + + Map map = U.field(supplier, "scMap"); + + synchronized (map) { + assert map.isEmpty(); + } + } + } + } + + @Override protected long getTestTimeout() { + return 5 * 60_000; + } + + /** + * @throws Exception + */ + public void testComplexRebalancing() throws Exception { + final Ignite ignite = startGrid(0); + + generateData(ignite, 0, 0); + + log.info("Preloading started."); + + long start = System.currentTimeMillis(); + + concurrentStartFinished = false; + concurrentStartFinished2 = false; + concurrentStartFinished3 = false; + + Thread t1 = new Thread() { + @Override public void run() { + try { + startGrid(1); + startGrid(2); + + while (!concurrentStartFinished2) { + U.sleep(10); + } + + waitForRebalancing(0, 5, 0); + waitForRebalancing(1, 5, 0); + waitForRebalancing(2, 5, 0); + waitForRebalancing(3, 5, 0); + waitForRebalancing(4, 5, 0); + + //New cache should start rebalancing. + CacheConfiguration<Integer, Integer> cacheRCfg = new CacheConfiguration<>(); + + cacheRCfg.setName(CACHE_NAME_DHT_PARTITIONED + "_NEW"); + cacheRCfg.setCacheMode(CacheMode.PARTITIONED); + cacheRCfg.setRebalanceMode(CacheRebalanceMode.SYNC); + + grid(0).getOrCreateCache(cacheRCfg); + + while (!concurrentStartFinished3) { + U.sleep(10); + } + + concurrentStartFinished = true; + } + catch (Exception e) { + e.printStackTrace(); + } + } + }; + + Thread t2 = new Thread() { + @Override public void run() { + try { + startGrid(3); + startGrid(4); + + concurrentStartFinished2 = true; + } + catch (Exception e) { + e.printStackTrace(); + } + } + }; + + Thread t3 = new Thread() { + @Override public void run() { + generateData(ignite, 0, 1); + + concurrentStartFinished3 = true; + } + }; + + t1.start(); + t2.start();// Should cancel t1 rebalancing. + t3.start(); + + t1.join(); + t2.join(); + t3.join(); + + waitForRebalancing(0, 5, 1); + waitForRebalancing(1, 5, 1); + waitForRebalancing(2, 5, 1); + waitForRebalancing(3, 5, 1); + waitForRebalancing(4, 5, 1); + + awaitPartitionMapExchange(true); + + checkSupplyContextMapIsEmpty(); + + checkData(grid(4), 0, 1); + + final Ignite ignite3 = grid(3); + + Thread t4 = new Thread() { + @Override public void run() { + generateData(ignite3, 0, 2); + + } + }; + + t4.start(); + + stopGrid(1); + + waitForRebalancing(0, 6); + waitForRebalancing(2, 6); + waitForRebalancing(3, 6); + waitForRebalancing(4, 6); + + awaitPartitionMapExchange(true); + + checkSupplyContextMapIsEmpty(); + + stopGrid(0); + + waitForRebalancing(2, 7); + waitForRebalancing(3, 7); + waitForRebalancing(4, 7); + + awaitPartitionMapExchange(true); + + checkSupplyContextMapIsEmpty(); + + stopGrid(2); + + waitForRebalancing(3, 8); + waitForRebalancing(4, 8); + + awaitPartitionMapExchange(true); + + checkSupplyContextMapIsEmpty(); + + t4.join(); + + stopGrid(3); + + waitForRebalancing(4, 9); + + checkSupplyContextMapIsEmpty(); + + long spend = (System.currentTimeMillis() - start) / 1000; + + checkData(grid(4), 0, 2); + + log.info("Spend " + spend + " seconds to rebalance entries."); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java new file mode 100644 index 0000000..831e82d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.rebalancing; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheRebalanceMode; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public class GridCacheRebalancingUnmarshallingFailedSelfTest extends GridCommonAbstractTest { + /** */ + protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** partitioned cache name. */ + protected static String CACHE = "cache"; + + /** Allows to change behavior of readExternal method. */ + protected static AtomicInteger readCnt = new AtomicInteger(); + + /** Test key 1. */ + private static class TestKey implements Externalizable { + /** Field. */ + @QuerySqlField(index = true) + private String field; + + /** + * @param field Test key 1. + */ + public TestKey(String field) { + this.field = field; + } + + /** Test key 1. */ + public TestKey() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + TestKey key = (TestKey)o; + + return !(field != null ? !field.equals(key.field) : key.field != null); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return field != null ? field.hashCode() : 0; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(field); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + field = (String)in.readObject(); + + if (readCnt.decrementAndGet() <= 0) + throw new IOException("Class can not be unmarshalled."); + } + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration iCfg = super.getConfiguration(gridName); + + CacheConfiguration<TestKey, Integer> cfg = new CacheConfiguration<>(); + + cfg.setName(CACHE); + cfg.setCacheMode(CacheMode.PARTITIONED); + cfg.setRebalanceMode(CacheRebalanceMode.SYNC); + cfg.setBackups(0); + + iCfg.setCacheConfiguration(cfg); + + return iCfg; + } + + /** + * @throws Exception e. + */ + public void test() throws Exception { + readCnt.set(Integer.MAX_VALUE); + + startGrid(0); + + for (int i = 0; i < 100; i++) { + grid(0).cache(CACHE).put(new TestKey(String.valueOf(i)), i); + } + + readCnt.set(1); + + startGrid(1); + + readCnt.set(Integer.MAX_VALUE); + + for (int i = 0; i < 50; i++) { + assert grid(1).cache(CACHE).get(new TestKey(String.valueOf(i))) != null; + } + + stopGrid(0); + + for (int i = 50; i < 100; i++) { + assert grid(1).cache(CACHE).get(new TestKey(String.valueOf(i))) == null; + } + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java index c4ad169..64f1495 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java @@ -142,26 +142,6 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest { } /** - * @throws Exception If failed. - */ - public void testSingleZeroPoolSize() throws Exception { - preloadMode = SYNC; - poolSize = 0; - - try { - startGrid(1); - - assert false : "Grid should have been failed to start."; - } - catch (IgniteCheckedException e) { - info("Caught expected exception: " + e); - } - finally { - stopAllGrids(); - } - } - - /** * @throws Exception If test failed. */ public void testIntegrity() throws Exception { @@ -602,4 +582,4 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest { // No-op. } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index 0280e9c..51d8a2d 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -511,23 +511,6 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { } /** - * - */ - private static class TestTcpDiscoverySpi extends TcpDiscoverySpi { - /** */ - private boolean ignorePingResponse; - - /** {@inheritDoc} */ - protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, - IgniteCheckedException { - if (msg instanceof TcpDiscoveryPingResponse && ignorePingResponse) - return; - else - super.writeToSocket(sock, msg, timeout); - } - } - - /** * @throws Exception If any error occurs. */ public void testNodeAdded() throws Exception { http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java new file mode 100644 index 0000000..dbc54bc --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import java.io.IOException; +import java.net.Socket; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse; + +/** + * + */ +public class TestTcpDiscoverySpi extends TcpDiscoverySpi { + /** */ + public boolean ignorePingResponse; + + /** {@inheritDoc} */ + protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, + IgniteCheckedException { + if (msg instanceof TcpDiscoveryPingResponse && ignorePingResponse) + return; + else + super.writeToSocket(sock, msg, timeout); + } + + /** {@inheritDoc} */ + @Override public void simulateNodeFailure() { + super.simulateNodeFailure(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java index d133a84..41d4b4a 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java @@ -77,6 +77,7 @@ import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.spi.checkpoint.sharedfs.SharedFsCheckpointSpi; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -1228,7 +1229,7 @@ public abstract class GridAbstractTest extends TestCase { cfg.setCommunicationSpi(commSpi); - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + TcpDiscoverySpi discoSpi = new TestTcpDiscoverySpi(); if (isDebug()) { discoSpi.setMaxMissedHeartbeats(Integer.MAX_VALUE); http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java index 28d5c73..71f3ee3 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java @@ -63,6 +63,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheFuture; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedCache; @@ -414,6 +416,15 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { */ @SuppressWarnings("BusyWait") protected void awaitPartitionMapExchange() throws InterruptedException { + awaitPartitionMapExchange(false); + } + + /** + * @param waitEvicts If {@code true} will wait for evictions finished. + * @throws InterruptedException If interrupted. + */ + @SuppressWarnings("BusyWait") + protected void awaitPartitionMapExchange(boolean waitEvicts) throws InterruptedException { for (Ignite g : G.allGrids()) { IgniteKernal g0 = (IgniteKernal)g; @@ -451,7 +462,10 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { int actual = owners.size(); - if (affNodes.size() != owners.size() || !affNodes.containsAll(owners)) { + GridDhtLocalPartition loc = top.localPartition(p, readyVer, false); + + if (affNodes.size() != owners.size() || !affNodes.containsAll(owners) || + (waitEvicts && loc != null && loc.state() == GridDhtPartitionState.RENTING)) { LT.warn(log(), null, "Waiting for topology map update [" + "grid=" + g.name() + ", cache=" + cfg.getName() + @@ -484,7 +498,9 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { if (i == 0) start = System.currentTimeMillis(); - if (System.currentTimeMillis() - start > 30_000) + if (System.currentTimeMillis() - start > 30_000) { + U.dumpThreads(log); + throw new IgniteException("Timeout of waiting for topology map update [" + "grid=" + g.name() + ", cache=" + cfg.getName() + @@ -493,6 +509,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { ", p=" + p + ", readVer=" + readyVer + ", locNode=" + g.cluster().localNode() + ']'); + } Thread.sleep(200); // Busy wait. http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java index 796c531..c3c3659 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java @@ -49,6 +49,8 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheNea import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheNearPartitionedP2PEnabledByteArrayValuesSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePutArrayValueSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.IgniteTxReentryNearSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingAsyncSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingSyncSelfTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheDaemonNodeReplicatedSelfTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedAtomicGetAndTransformStoreSelfTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedBasicApiTest; @@ -135,6 +137,8 @@ public class IgniteCacheTestSuite3 extends TestSuite { suite.addTestSuite(IgniteTxReentryColocatedSelfTest.class); suite.addTestSuite(GridCacheOrderedPreloadingSelfTest.class); + suite.addTestSuite(GridCacheRebalancingSyncSelfTest.class); + suite.addTestSuite(GridCacheRebalancingAsyncSelfTest.class); // Test for byte array value special case. suite.addTestSuite(GridCacheLocalByteArrayValuesSelfTest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/indexing/src/test/java/org/apache/ignite/spi/communication/tcp/GridOrderedMessageCancelSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/spi/communication/tcp/GridOrderedMessageCancelSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/spi/communication/tcp/GridOrderedMessageCancelSelfTest.java index 0226046..582bfe3 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/spi/communication/tcp/GridOrderedMessageCancelSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/spi/communication/tcp/GridOrderedMessageCancelSelfTest.java @@ -107,25 +107,33 @@ public class GridOrderedMessageCancelSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testTask() throws Exception { + Map map = U.field(((IgniteKernal)grid(0)).context().io(), "msgSetMap"); + + int initSize = map.size(); + ComputeTaskFuture<?> fut = executeAsync(compute(grid(0).cluster().forRemotes()), Task.class, null); - testMessageSet(fut); + testMessageSet(fut, initSize, map); } /** * @throws Exception If failed. */ public void testTaskException() throws Exception { + Map map = U.field(((IgniteKernal)grid(0)).context().io(), "msgSetMap"); + + int initSize = map.size(); + ComputeTaskFuture<?> fut = executeAsync(compute(grid(0).cluster().forRemotes()), FailTask.class, null); - testMessageSet(fut); + testMessageSet(fut, initSize, map); } /** * @param fut Future to cancel. * @throws Exception If failed. */ - private void testMessageSet(IgniteFuture<?> fut) throws Exception { + private void testMessageSet(IgniteFuture<?> fut, int initSize, Map map) throws Exception { cancelLatch.await(); assertTrue(fut.cancel()); @@ -134,11 +142,9 @@ public class GridOrderedMessageCancelSelfTest extends GridCommonAbstractTest { assertTrue(U.await(finishLatch, 5000, MILLISECONDS)); - Map map = U.field(((IgniteKernal)grid(0)).context().io(), "msgSetMap"); - info("Map: " + map); - assertTrue(map.isEmpty()); + assertEquals(map.size(), initSize); } /**