Repository: ignite Updated Branches: refs/heads/ignite-4424 [created] 23b606e8c
IGNITE-4424 REPLICATED cache isn't synced across nodes Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/23b606e8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/23b606e8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/23b606e8 Branch: refs/heads/ignite-4424 Commit: 23b606e8c503d843bfc97c25ca0a5cb574f9614a Parents: 8dd4ada Author: Anton Vinogradov <a...@apache.org> Authored: Thu Dec 22 18:37:07 2016 +0300 Committer: Anton Vinogradov <a...@apache.org> Committed: Thu Dec 22 18:37:07 2016 +0300 ---------------------------------------------------------------------- .../GridNearAtomicAbstractUpdateFuture.java | 24 +++ .../GridNearAtomicSingleUpdateFuture.java | 37 ++-- .../dht/atomic/GridNearAtomicUpdateFuture.java | 43 ++-- .../AtomicPutAllChangingTopologyTest.java | 195 +++++++++++++++++++ .../IgniteCacheFailoverTestSuite.java | 3 + 5 files changed, 246 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/23b606e8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java index 2fbabaa..fd0a699 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java @@ -212,6 +212,8 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt // Cannot remap. remapCnt = 1; + beforeMap(topVer); + map(topVer); } } @@ -314,4 +316,26 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt onResult(req.nodeId(), res, true); } } + + /** + * Adds future prevents topology change before operation complete. + * Should be invoked before topology lock released. + * @param topVer Topology version. + */ + protected void beforeMap(AffinityTopologyVersion topVer) { + GridCacheVersion futVer = cctx.versions().next(topVer); + + synchronized (mux) { + assert this.futVer == null : this; + assert this.topVer == AffinityTopologyVersion.ZERO : this; + + this.topVer = topVer; + this.futVer = futVer; + } + + if (storeFuture()) { + if (!cctx.mvcc().addAtomicFuture(futVer, this)) + assert isDone() : this; + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/23b606e8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index bd231cf..0870e5f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -348,14 +348,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) { cctx.kernalContext().closure().runLocalSafe(new Runnable() { @Override public void run() { - try { - AffinityTopologyVersion topVer = fut.get(); - - map(topVer); - } - catch (IgniteCheckedException e) { - onDone(e); - } + mapOnTopology(); } }); } @@ -410,6 +403,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda } topVer = fut.topologyVersion(); + + beforeMap(topVer); } else { if (waitTopFut) { @@ -439,7 +434,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda } /** {@inheritDoc} */ - protected void map(AffinityTopologyVersion topVer) { + @Override protected void map(AffinityTopologyVersion topVer) { Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer); if (F.isEmpty(topNodes)) { @@ -449,11 +444,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda return; } - Exception err = null; - GridNearAtomicAbstractUpdateRequest singleReq0 = null; - - GridCacheVersion futVer = cctx.versions().next(topVer); - GridCacheVersion updVer; // Assign version on near node in CLOCK ordering mode even if fastMap is false. @@ -470,16 +460,19 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda else updVer = null; + Exception err = null; + GridNearAtomicAbstractUpdateRequest singleReq0 = null; + + GridCacheVersion futVer = cctx.versions().next(topVer); + try { singleReq0 = mapSingleUpdate(topVer, futVer, updVer); synchronized (mux) { - assert this.futVer == null : this; - assert this.topVer == AffinityTopologyVersion.ZERO : this; + assert this.futVer != null : this; + assert this.topVer == topVer : this; - this.topVer = topVer; this.updVer = updVer; - this.futVer = futVer; resCnt = 0; @@ -496,14 +489,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda return; } - if (storeFuture()) { - if (!cctx.mvcc().addAtomicFuture(futVer, this)) { - assert isDone() : this; - - return; - } - } - // Optimize mapping for single key. mapSingle(singleReq0.nodeId(), singleReq0); } http://git-wip-us.apache.org/repos/asf/ignite/blob/23b606e8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index cd64117..a4634ee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -456,14 +456,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) { cctx.kernalContext().closure().runLocalSafe(new Runnable() { @Override public void run() { - try { - AffinityTopologyVersion topVer = fut.get(); - - map(topVer, remapKeys); - } - catch (IgniteCheckedException e) { - onDone(e); - } + mapOnTopology(); } }); } @@ -519,6 +512,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } topVer = fut.topologyVersion(); + + beforeMap(topVer); } else { if (waitTopFut) { @@ -544,7 +539,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu cache.topology().readUnlock(); } - map(topVer, null); + map(topVer, remapKeys); } /** @@ -602,7 +597,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } /** {@inheritDoc} */ - protected void map(AffinityTopologyVersion topVer) { + @Override protected void map(AffinityTopologyVersion topVer) { map(topVer, null); } @@ -620,14 +615,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu return; } - Exception err = null; - GridNearAtomicFullUpdateRequest singleReq0 = null; - Map<UUID, GridNearAtomicFullUpdateRequest> mappings0 = null; - - int size = keys.size(); - - GridCacheVersion futVer = cctx.versions().next(topVer); - GridCacheVersion updVer; // Assign version on near node in CLOCK ordering mode even if fastMap is false. @@ -644,6 +631,12 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu else updVer = null; + Exception err = null; + GridNearAtomicFullUpdateRequest singleReq0 = null; + Map<UUID, GridNearAtomicFullUpdateRequest> mappings0 = null; + + int size = keys.size(); + try { if (size == 1 && !fastMap) { assert remapKeys == null || remapKeys.size() == 1; @@ -676,12 +669,10 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } synchronized (mux) { - assert this.futVer == null : this; - assert this.topVer == AffinityTopologyVersion.ZERO : this; + assert this.futVer != null : this; + assert this.topVer == topVer : this; - this.topVer = topVer; this.updVer = updVer; - this.futVer = futVer; resCnt = 0; @@ -701,14 +692,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu return; } - if (storeFuture()) { - if (!cctx.mvcc().addAtomicFuture(futVer, this)) { - assert isDone() : this; - - return; - } - } - // Optimize mapping for single key. if (singleReq0 != null) mapSingle(singleReq0.nodeId(), singleReq0); http://git-wip-us.apache.org/repos/asf/ignite/blob/23b606e8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/AtomicPutAllChangingTopologyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/AtomicPutAllChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/AtomicPutAllChangingTopologyTest.java new file mode 100644 index 0000000..5920dea --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/AtomicPutAllChangingTopologyTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CachePeekMode; +import org.apache.ignite.cache.CacheRebalanceMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.affinity.fair.FairAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** */ +public class AtomicPutAllChangingTopologyTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int NODES_CNT = 3; + + /** */ + public static final String CACHE_NAME = "test-cache"; + + /** */ + private static final int CACHE_SIZE = 20_000; + + /** */ + private static volatile CountDownLatch FILLED_LATCH; + + /** */ + private CacheConfiguration<Integer, Integer> cacheCfg() { + return new CacheConfiguration<Integer, Integer>() + .setAtomicityMode(CacheAtomicityMode.ATOMIC) + .setCacheMode(CacheMode.REPLICATED) + .setAffinity(new FairAffinityFunction(false, 1)) + .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC) + .setRebalanceMode(CacheRebalanceMode.SYNC) + .setName(CACHE_NAME); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + return cfg; + } + + /** + * + */ + public void testPutAllOnChangingTopology() throws Exception { + List<IgniteInternalFuture> futs = new LinkedList<>(); + + for (int i = 1; i < NODES_CNT; i++) + futs.add(startNodeAsync(i)); + + futs.add(startSeedNodeAsync()); + + boolean failed = false; + + for (IgniteInternalFuture fut : futs) { + try { + fut.get(); + } + catch (Throwable th) { + log.error("Check failed.", th); + + failed = true; + } + } + + if (failed) + throw new RuntimeException("Test Failed."); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + FILLED_LATCH = new CountDownLatch(1); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** */ + private IgniteInternalFuture startSeedNodeAsync() throws IgniteCheckedException { + return GridTestUtils.runAsync(new Callable<Object>() { + @Override public Boolean call() throws Exception { + Ignite node = startGrid(0); + + log.info("Creating cache."); + + IgniteCache<Integer, Integer> cache = node.getOrCreateCache(cacheCfg()); + + log.info("Created cache."); + + Map<Integer, Integer> data = new HashMap<>(CACHE_SIZE); + + for (int i = 0; i < CACHE_SIZE; i++) + data.put(i, i); + + log.info("Filling."); + + cache.putAll(data); + + log.info("Filled."); + + FILLED_LATCH.countDown(); + + checkCacheState(node, cache); + + return true; + } + }); + } + + /** */ + private IgniteInternalFuture startNodeAsync(final int nodeId) throws IgniteCheckedException { + return GridTestUtils.runAsync(new Callable<Object>() { + @Override public Boolean call() throws Exception { + Ignite node = startGrid(nodeId); + + log.info("Getting cache."); + + IgniteCache<Integer, Integer> cache = node.getOrCreateCache(cacheCfg()); + + log.info("Got cache."); + + FILLED_LATCH.await(); + + log.info("Got Filled."); + + cache.put(1, nodeId); + + checkCacheState(node, cache); + + return true; + } + }); + } + + /** */ + private void checkCacheState(Ignite node, IgniteCache<Integer, Integer> cache) throws Exception { + int locSize = cache.localSize(CachePeekMode.PRIMARY, CachePeekMode.BACKUP); + int locSize2 = -1; + + if (locSize != CACHE_SIZE) { + U.sleep(5000); + + // Rechecking. + locSize2 = cache.localSize(CachePeekMode.PRIMARY, CachePeekMode.BACKUP); + } + + assertEquals("Wrong cache size on node " + "[node=" + node.configuration().getGridName() + + ", expected= " + CACHE_SIZE + ", actual=" + locSize + ", actual2=" + locSize2 + "]", + locSize, CACHE_SIZE); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/23b606e8/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java index c9e507d..5bc6729 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java @@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDhtR import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheTxNodeFailureSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridNearCacheTxNodeFailureSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteAtomicLongChangingTopologySelfTest; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.AtomicPutAllChangingTopologyTest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridCacheAtomicClientInvalidPartitionHandlingSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridCacheAtomicClientRemoveFailureTest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridCacheAtomicInvalidPartitionHandlingSelfTest; @@ -95,6 +96,8 @@ public class IgniteCacheFailoverTestSuite extends TestSuite { suite.addTestSuite(GridCacheTxNodeFailureSelfTest.class); suite.addTestSuite(GridNearCacheTxNodeFailureSelfTest.class); + suite.addTestSuite(AtomicPutAllChangingTopologyTest.class); + return suite; } }