ignite-1.5 Fixed GridNearAtomicUpdateFuture to do not complete future before near cache is updated. Several test fixes.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/96feee9f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/96feee9f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/96feee9f Branch: refs/heads/ignite-1537 Commit: 96feee9f3db9fc792bb1cd7f9d4a72aa28774783 Parents: e45f8ae Author: sboikov <sboi...@gridgain.com> Authored: Tue Dec 15 14:40:21 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Dec 15 14:40:21 2015 +0300 ---------------------------------------------------------------------- .../CacheDataStructuresManager.java | 5 +- .../distributed/dht/GridDhtCacheAdapter.java | 7 +- .../dht/GridPartitionedSingleGetFuture.java | 7 +- .../dht/atomic/GridDhtAtomicUpdateRequest.java | 10 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 36 +++- .../dht/atomic/GridNearAtomicUpdateRequest.java | 25 +++ .../distributed/near/GridNearAtomicCache.java | 3 + .../CacheSerializableTransactionsTest.java | 8 + .../cache/GridCacheAbstractFullApiSelfTest.java | 8 + ...IgniteCacheAtomicPutAllFailoverSelfTest.java | 1 + ...gniteAtomicLongChangingTopologySelfTest.java | 8 +- ...omicMultiNodeP2PDisabledFullApiSelfTest.java | 5 - ...ledFairAffinityMultiNodeFullApiSelfTest.java | 5 - .../near/NearCacheSyncUpdateTest.java | 167 +++++++++++++++++++ ...dTcpCommunicationSpiRecoveryAckSelfTest.java | 30 +++- ...CommunicationRecoveryAckClosureSelfTest.java | 39 +++-- .../testsuites/IgniteCacheTestSuite2.java | 2 + .../IgniteSpiDiscoverySelfTestSuite.java | 6 + 18 files changed, 322 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/96feee9f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java index f56cbf8..47c3dd9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java @@ -39,6 +39,7 @@ import org.apache.ignite.cache.CacheEntryEventSerializableFilter; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.cluster.ClusterTopologyException; import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; @@ -493,7 +494,7 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { true).get(); } catch (IgniteCheckedException e) { - if (e.hasCause(ClusterTopologyException.class)) { + if (e.hasCause(ClusterTopologyCheckedException.class)) { if (log.isDebugEnabled()) log.debug("RemoveSetData job failed, will retry: " + e); @@ -516,7 +517,7 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { true).get(); } catch (IgniteCheckedException e) { - if (e.hasCause(ClusterTopologyException.class)) { + if (e.hasCause(ClusterTopologyCheckedException.class)) { if (log.isDebugEnabled()) log.debug("RemoveSetData job failed, will retry: " + e); http://git-wip-us.apache.org/repos/asf/ignite/blob/96feee9f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 9199e70..9cf8084 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -775,9 +775,14 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap res.setContainsValue(); } else { + AffinityTopologyVersion topVer = ctx.shared().exchange().readyAffinityVersion(); + + assert topVer.compareTo(req.topologyVersion()) >= 0 : "Wrong ready topology version for " + + "invalid partitions response [topVer=" + topVer + ", req=" + req + ']'; + res = new GridNearSingleGetResponse(ctx.cacheId(), req.futureId(), - ctx.shared().exchange().readyAffinityVersion(), + topVer, null, true, req.addDeploymentInfo()); http://git-wip-us.apache.org/repos/asf/ignite/blob/96feee9f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index f3f225a..5d0814f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -191,8 +191,6 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im */ @SuppressWarnings("unchecked") private void map(AffinityTopologyVersion topVer) { - this.topVer = topVer; - ClusterNode node = mapKeyToNode(topVer); if (node == null) { @@ -250,6 +248,9 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im } else { synchronized (this) { + assert this.node == null; + + this.topVer = topVer; this.node = node; } @@ -325,7 +326,7 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im GridDhtCacheAdapter colocated = cctx.dht(); while (true) { - GridCacheEntryEx entry = null; + GridCacheEntryEx entry; try { entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) : http://git-wip-us.apache.org/repos/asf/ignite/blob/96feee9f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index 1f4cb6a..7bee5a3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -151,11 +151,13 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid @GridDirectTransient private boolean onRes; + /** */ @GridDirectTransient private List<Integer> partIds; + /** */ @GridDirectTransient - private List<CacheObject> localPrevVals; + private List<CacheObject> locPrevVals; /** Keep binary flag. */ private boolean keepBinary; @@ -213,7 +215,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid keys = new ArrayList<>(); partIds = new ArrayList<>(); - localPrevVals = new ArrayList<>(); + locPrevVals = new ArrayList<>(); if (forceTransformBackups) { entryProcessors = new ArrayList<>(); @@ -254,7 +256,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid partIds.add(partId); - localPrevVals.add(prevVal); + locPrevVals.add(prevVal); if (forceTransformBackups) { assert entryProcessor != null; @@ -519,7 +521,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid * @return Value. */ @Nullable public CacheObject localPreviousValue(int idx) { - return localPrevVals.get(idx); + return locPrevVals.get(idx); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/96feee9f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index ba3d546..b384bab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -167,6 +167,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> * @param subjId Subject ID. * @param taskNameHash Task name hash code. * @param skipStore Skip store flag. + * @param keepBinary Keep binary flag. * @param remapCnt Maximum number of retries. * @param waitTopFut If {@code false} does not wait for affinity change future. */ @@ -359,7 +360,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> * @param res Update response. */ private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { - if (!nearEnabled || !req.hasPrimary()) + assert nearEnabled; + + if (res.remapKeys() != null || !req.hasPrimary()) return; GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near(); @@ -544,6 +547,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> @GridToStringInclude private Map<UUID, GridNearAtomicUpdateRequest> mappings; + /** */ + private int resCnt; + /** Error. */ private CachePartialUpdateCheckedException err; @@ -583,7 +589,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> else req = mappings != null ? mappings.get(nodeId) : null; - if (req != null) { + if (req != null && req.response() == null) { res = new GridNearAtomicUpdateResponse(cctx.cacheId(), nodeId, req.futureVersion(), cctx.deploymentEnabled()); @@ -632,10 +638,13 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> rcvAll = true; } else { - req = mappings != null ? mappings.remove(nodeId) : null; + req = mappings != null ? mappings.get(nodeId) : null; + + if (req != null && req.onResponse(res)) { + resCnt++; - if (req != null) - rcvAll = mappings.isEmpty(); + rcvAll = mappings.size() == resCnt; + } else return; } @@ -731,8 +740,19 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> return; } - if (!nodeErr && res.remapKeys() == null) - updateNear(req, res); + if (rcvAll && nearEnabled) { + if (mappings != null) { + for (GridNearAtomicUpdateRequest req0 : mappings.values()) { + GridNearAtomicUpdateResponse res0 = req0.response(); + + assert res0 != null : req0; + + updateNear(req0, res0); + } + } + else if (!nodeErr) + updateNear(req, res); + } if (remapTopVer != null) { if (fut0 != null) @@ -828,6 +848,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> assert futVer == null : this; assert this.topVer == AffinityTopologyVersion.ZERO : this; + resCnt = 0; + this.topVer = topVer; futVer = cctx.versions().next(topVer); http://git-wip-us.apache.org/repos/asf/ignite/blob/96feee9f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java index c24ad34..7c0aba5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java @@ -154,6 +154,10 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri /** Keep binary flag. */ private boolean keepBinary; + /** */ + @GridDirectTransient + private GridNearAtomicUpdateResponse res; + /** * Empty constructor required by {@link Externalizable}. */ @@ -544,6 +548,27 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri return hasPrimary; } + /** + * @param res Response. + * @return {@code True} if current response was {@code null}. + */ + public boolean onResponse(GridNearAtomicUpdateResponse res) { + if (this.res == null) { + this.res = res; + + return true; + } + + return false; + } + + /** + * @return Response. + */ + @Nullable public GridNearAtomicUpdateResponse response() { + return res; + } + /** {@inheritDoc} * @param ctx*/ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { http://git-wip-us.apache.org/repos/asf/ignite/blob/96feee9f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 06898cd..a2d5adb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -130,6 +130,9 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res ) { + if (F.size(res.failedKeys()) == req.keys().size()) + return; + /* * Choose value to be stored in near cache: first check key is not in failed and not in skipped list, * then check if value was generated on primary node, if not then use value sent in request. http://git-wip-us.apache.org/repos/asf/ignite/blob/96feee9f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java index ae64bb4..f4533f2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java @@ -2981,6 +2981,8 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { IgniteInternalFuture<?> restartFut = restart ? restartFuture(stop, null) : null; + final long stopTime = U.currentTimeMillis() + getTestTimeout() - 30_000; + for (int i = 0; i < 30; i++) { final AtomicInteger cntr = new AtomicInteger(); @@ -3007,6 +3009,9 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { barrier.await(); for (int i = 0; i < 1000; i++) { + if (i % 100 == 0 && U.currentTimeMillis() > stopTime) + break; + try { try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { Integer val = cache.get(key); @@ -3036,6 +3041,9 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { assertTrue(cntr.get() > 0); checkValue(key, cntr.get(), cacheName, restart); + + if (U.currentTimeMillis() > stopTime) + break; } stop.set(true); http://git-wip-us.apache.org/repos/asf/ignite/blob/96feee9f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java index b984afa..5b294cc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@ -120,6 +120,9 @@ import static org.apache.ignite.transactions.TransactionState.COMMITTED; */ @SuppressWarnings("TransientFieldInNonSerializableClass") public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstractSelfTest { + /** Test timeout */ + private static final long TEST_TIMEOUT = 60 * 1000; + /** */ public static final CacheEntryProcessor<String, Integer, String> ERR_PROCESSOR = new CacheEntryProcessor<String, Integer, String>() { @@ -166,6 +169,11 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract private Map<String, CacheConfiguration[]> cacheCfgMap; /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return TEST_TIMEOUT; + } + + /** {@inheritDoc} */ @Override protected int gridCount() { return 1; } http://git-wip-us.apache.org/repos/asf/ignite/blob/96feee9f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPutAllFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPutAllFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPutAllFailoverSelfTest.java index b3464b8..3f9fc5c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPutAllFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPutAllFailoverSelfTest.java @@ -30,6 +30,7 @@ public class IgniteCacheAtomicPutAllFailoverSelfTest extends GridCachePutAllFail return ATOMIC; } + /** {@inheritDoc} */ @Override public void testPutAllFailoverColocatedNearEnabledTwoBackupsOffheapTieredSwap(){ fail("https://issues.apache.org/jira/browse/IGNITE-1584"); } http://git-wip-us.apache.org/repos/asf/ignite/blob/96feee9f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java index 74cb9ef..f14dc5a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java @@ -399,8 +399,14 @@ public class IgniteAtomicLongChangingTopologySelfTest extends GridCommonAbstract /** * @param i Node index. + * @param startLatch Thread start latch. + * @param run Run flag. + * @throws Exception If failed. + * @return Threads future. */ - private IgniteInternalFuture<?> startNodeAndCreaterThread(final int i, final CountDownLatch startLatch, final AtomicBoolean run) + private IgniteInternalFuture<?> startNodeAndCreaterThread(final int i, + final CountDownLatch startLatch, + final AtomicBoolean run) throws Exception { return multithreadedAsync(new Runnable() { @Override public void run() { http://git-wip-us.apache.org/repos/asf/ignite/blob/96feee9f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicMultiNodeP2PDisabledFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicMultiNodeP2PDisabledFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicMultiNodeP2PDisabledFullApiSelfTest.java index c468cc2..d4efff3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicMultiNodeP2PDisabledFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicMultiNodeP2PDisabledFullApiSelfTest.java @@ -65,9 +65,4 @@ public class GridCacheAtomicMultiNodeP2PDisabledFullApiSelfTest return ccfg; } - - /** {@inheritDoc} */ - @Override public void testWithSkipStore() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-1809"); - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/96feee9f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicNearEnabledFairAffinityMultiNodeFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicNearEnabledFairAffinityMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicNearEnabledFairAffinityMultiNodeFullApiSelfTest.java index e4784f2..64943e6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicNearEnabledFairAffinityMultiNodeFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicNearEnabledFairAffinityMultiNodeFullApiSelfTest.java @@ -33,9 +33,4 @@ public class GridCacheAtomicNearEnabledFairAffinityMultiNodeFullApiSelfTest return cfg; } - - /** {@inheritDoc} */ - @Override public void testWithSkipStore(){ - fail("https://issues.apache.org/jira/browse/IGNITE-1582"); - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/96feee9f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/NearCacheSyncUpdateTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/NearCacheSyncUpdateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/NearCacheSyncUpdateTest.java new file mode 100644 index 0000000..bbdf50e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/NearCacheSyncUpdateTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicWriteOrderMode; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK; +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class NearCacheSyncUpdateTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(3); + } + + /** + * @throws Exception If failed. + */ + public void testNearCacheSyncUpdateAtomic1() throws Exception { + nearCacheSyncUpdateTx(ATOMIC, CLOCK); + } + + /** + * @throws Exception If failed. + */ + public void testNearCacheSyncUpdateAtomic2() throws Exception { + nearCacheSyncUpdateTx(ATOMIC, PRIMARY); + } + + /** + * @throws Exception If failed. + */ + public void testNearCacheSyncUpdateTx() throws Exception { + nearCacheSyncUpdateTx(TRANSACTIONAL, null); + } + + /** + * @param atomicityMode Atomicity mode. + * @param writeOrderMode Write order mode. + * @throws Exception If failed. + */ + private void nearCacheSyncUpdateTx(CacheAtomicityMode atomicityMode, + CacheAtomicWriteOrderMode writeOrderMode) throws Exception { + final IgniteCache<Integer, Integer> cache = + ignite(0).createCache(cacheConfiguration(atomicityMode, writeOrderMode)); + + try { + final AtomicInteger idx = new AtomicInteger(); + + final int KEYS_PER_THREAD = 5000; + + GridTestUtils.runMultiThreaded(new Callable<Void>() { + @Override public Void call() throws Exception { + int idx0 = idx.getAndIncrement(); + + int startKey = KEYS_PER_THREAD * idx0; + + for (int i = startKey; i < startKey + KEYS_PER_THREAD; i++) { + cache.put(i, i); + + assertEquals(i, (Object)cache.localPeek(i)); + + cache.remove(i); + + assertNull(cache.get(i)); + } + + final int BATCH_SIZE = 50; + + Map<Integer, Integer> map = new TreeMap<>(); + + for (int i = startKey; i < startKey + KEYS_PER_THREAD; i++) { + map.put(i, i); + + if (map.size() == BATCH_SIZE) { + cache.putAll(map); + + for (Integer key : map.keySet()) + assertEquals(key, cache.localPeek(key)); + + cache.removeAll(map.keySet()); + + for (Integer key : map.keySet()) + assertNull(cache.get(key)); + + map.clear(); + } + } + + return null; + } + }, 10, "update-thread"); + } + finally { + ignite(0).destroyCache(null); + } + } + + /** + * @param atomicityMode Atomicity mode. + * @param writeOrderMode Write order mode. + * @return Cache configuration. + */ + private CacheConfiguration<Integer, Integer> cacheConfiguration(CacheAtomicityMode atomicityMode, + CacheAtomicWriteOrderMode writeOrderMode) { + CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicityMode(atomicityMode); + ccfg.setAtomicWriteOrderMode(writeOrderMode); + ccfg.setNearConfiguration(new NearCacheConfiguration<Integer, Integer>()); + + return ccfg; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/96feee9f/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java index a709cc4..38e3d98 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java @@ -37,6 +37,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.IgniteSpiAdapter; +import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.communication.CommunicationListener; import org.apache.ignite.spi.communication.CommunicationSpi; import org.apache.ignite.spi.communication.GridTestMessage; @@ -133,6 +134,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS * @param msgPerIter Messages per iteration. * @throws Exception If failed. */ + @SuppressWarnings("unchecked") private void checkAck(int ackCnt, int idleTimeout, int msgPerIter) throws Exception { createSpis(ackCnt, idleTimeout, TcpCommunicationSpi.DFLT_MSG_QUEUE_LIMIT); @@ -196,8 +198,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS final TestListener lsnr = (TestListener)spi.getListener(); GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override - public boolean apply() { + @Override public boolean apply() { return lsnr.rcvCnt.get() >= expMsgs0; } }, 5000); @@ -247,6 +248,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS /** * @throws Exception If failed. */ + @SuppressWarnings("unchecked") private void checkOverflow() throws Exception { TcpCommunicationSpi spi0 = spis.get(0); TcpCommunicationSpi spi1 = spis.get(1); @@ -266,8 +268,20 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS final GridNioSession ses0 = communicationSession(spi0); - for (int i = 0; i < 150; i++) - spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0)); + int sentMsgs = 1; + + for (int i = 0; i < 150; i++) { + try { + spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0)); + + sentMsgs++; + } + catch (IgniteSpiException e) { + log.info("Send error [err=" + e + ", sentMsgs=" + sentMsgs + ']'); + + break; + } + } // Wait when session is closed because of queue overflow. GridTestUtils.waitForCondition(new GridAbsPredicate() { @@ -283,13 +297,12 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS for (int i = 0; i < 100; i++) spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0)); - final int expMsgs = 251; + final int expMsgs = sentMsgs + 100; final TestListener lsnr = (TestListener)spi1.getListener(); GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override - public boolean apply() { + @Override public boolean apply() { return lsnr.rcvCnt.get() >= expMsgs; } }, 5000); @@ -307,8 +320,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS final GridNioServer srv = U.field(spi, "nioSrvr"); GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override - public boolean apply() { + @Override public boolean apply() { Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions"); return !sessions.isEmpty(); http://git-wip-us.apache.org/repos/asf/ignite/blob/96feee9f/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java index fd2d91a..7521f2e 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java @@ -40,6 +40,7 @@ import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.IgniteSpiAdapter; +import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.communication.CommunicationListener; import org.apache.ignite.spi.communication.CommunicationSpi; import org.apache.ignite.spi.communication.GridTestMessage; @@ -135,6 +136,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic * @param msgPerIter Messages per iteration. * @throws Exception If failed. */ + @SuppressWarnings("unchecked") private void checkAck(int ackCnt, int idleTimeout, int msgPerIter) throws Exception { createSpis(ackCnt, idleTimeout, TcpCommunicationSpi.DFLT_MSG_QUEUE_LIMIT); @@ -154,7 +156,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic final AtomicInteger ackMsgs = new AtomicInteger(0); - IgniteInClosure<IgniteException> ackClosure = new CI1<IgniteException>() { + IgniteInClosure<IgniteException> ackC = new CI1<IgniteException>() { @Override public void apply(IgniteException o) { assert o == null; @@ -163,9 +165,9 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic }; for (int j = 0; j < msgPerIter; j++) { - spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure); + spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackC); - spi1.sendMessage(node0, new GridTestMessage(node1.id(), ++msgId, 0), ackClosure); + spi1.sendMessage(node0, new GridTestMessage(node1.id(), ++msgId, 0), ackC); } expMsgs += msgPerIter; @@ -207,8 +209,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic final TestListener lsnr = (TestListener)spi.getListener(); GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override - public boolean apply() { + @Override public boolean apply() { return lsnr.rcvCnt.get() >= expMsgs0; } }, 5000); @@ -260,6 +261,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic /** * @throws Exception If failed. */ + @SuppressWarnings("unchecked") private void checkOverflow() throws Exception { TcpCommunicationSpi spi0 = spis.get(0); TcpCommunicationSpi spi1 = spis.get(1); @@ -271,7 +273,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic final AtomicInteger ackMsgs = new AtomicInteger(0); - IgniteInClosure<IgniteException> ackClosure = new CI1<IgniteException>() { + IgniteInClosure<IgniteException> ackC = new CI1<IgniteException>() { @Override public void apply(IgniteException o) { assert o == null; @@ -282,15 +284,27 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic int msgId = 0; // Send message to establish connection. - spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure); + spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackC); // Prevent node1 from send GridTestUtils.setFieldValue(srv1, "skipWrite", true); final GridNioSession ses0 = communicationSession(spi0); - for (int i = 0; i < 150; i++) - spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure); + int sentMsgs = 1; + + for (int i = 0; i < 150; i++) { + try { + spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackC); + + sentMsgs++; + } + catch (IgniteSpiException e) { + log.info("Send error [err=" + e + ", sentMsgs=" + sentMsgs + ']'); + + break; + } + } // Wait when session is closed because of queue overflow. GridTestUtils.waitForCondition(new GridAbsPredicate() { @@ -304,9 +318,9 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic GridTestUtils.setFieldValue(srv1, "skipWrite", false); for (int i = 0; i < 100; i++) - spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure); + spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackC); - final int expMsgs = 251; + final int expMsgs = sentMsgs + 100; final TestListener lsnr = (TestListener)spi1.getListener(); @@ -335,8 +349,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic final GridNioServer srv = U.field(spi, "nioSrvr"); GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override - public boolean apply() { + @Override public boolean apply() { Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions"); return !sessions.isEmpty(); http://git-wip-us.apache.org/repos/asf/ignite/blob/96feee9f/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java index c94931e..cadcba7 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java @@ -106,6 +106,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePar import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedTxTimeoutSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheRendezvousAffinityClientSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridPartitionedBackupLoadSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.near.NearCacheSyncUpdateTest; import org.apache.ignite.internal.processors.cache.distributed.near.NoneRebalanceModeSelfTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedEvictionSelfTest; import org.apache.ignite.internal.processors.cache.local.GridCacheLocalAtomicBasicStoreSelfTest; @@ -246,6 +247,7 @@ public class IgniteCacheTestSuite2 extends TestSuite { suite.addTest(new TestSuite(CrossCacheTxRandomOperationsTest.class)); suite.addTest(new TestSuite(IgniteDynamicCacheAndNodeStop.class)); suite.addTest(new TestSuite(CacheLockReleaseNodeLeaveTest.class)); + suite.addTest(new TestSuite(NearCacheSyncUpdateTest.class)); return suite; } http://git-wip-us.apache.org/repos/asf/ignite/blob/96feee9f/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java index af86fbb..af7eb7e 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java @@ -39,6 +39,9 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.jdbc.TcpDiscoveryJdbcIpFinde import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinderSelfTest; import org.apache.ignite.spi.discovery.tcp.ipfinder.sharedfs.TcpDiscoverySharedFsIpFinderSelfTest; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinderSelfTest; +import org.apache.ignite.testframework.GridTestUtils; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_OVERRIDE_MCAST_GRP; /** * Test suite for all discovery spi implementations. @@ -49,6 +52,9 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite { * @throws Exception If failed. */ public static TestSuite suite() throws Exception { + System.setProperty(IGNITE_OVERRIDE_MCAST_GRP, + GridTestUtils.getNextMulticastGroup(IgniteSpiDiscoverySelfTestSuite.class)); + TestSuite suite = new TestSuite("Ignite Discovery SPI Test Suite"); // Tcp.