Repository: ignite Updated Branches: refs/heads/ignite-1537 0adfd928b -> 185c28ae6
ignite-647 Fixed issues with dynamic cache start when fair affinity is used Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/185c28ae Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/185c28ae Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/185c28ae Branch: refs/heads/ignite-1537 Commit: 185c28ae66c24c5c4d446f37c416a0091de61f8d Parents: 0adfd92 Author: sboikov <sboi...@gridgain.com> Authored: Wed Dec 23 17:48:52 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Dec 23 17:48:52 2015 +0300 ---------------------------------------------------------------------- .../dht/atomic/GridDhtAtomicCache.java | 89 ++++++++---------- ...ridNearOptimisticTxPrepareFutureAdapter.java | 12 ++- ...yMetadataUpdateChangingTopologySelfTest.java | 96 +++++++++++++------- 3 files changed, 110 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/185c28ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 634a9ea..393413e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1290,59 +1290,48 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridCacheReturn retVal = null; - IgniteTxManager tm = ctx.tm(); + if (keys.size() > 1 && // Several keys ... + writeThrough() && !req.skipStore() && // and store is enabled ... + !ctx.store().isLocal() && // and this is not local store ... + !ctx.dr().receiveEnabled() // and no DR. + ) { + // This method can only be used when there are no replicated entries in the batch. + UpdateBatchResult updRes = updateWithBatch(node, + hasNear, + req, + res, + locked, + ver, + dhtFut, + completionCb, + ctx.isDrEnabled(), + taskName, + expiry, + sndPrevVal); - // Needed for metadata cache transaction. - boolean set = tm.setTxTopologyHint(req.topologyVersion()); + deleted = updRes.deleted(); + dhtFut = updRes.dhtFuture(); - try { - if (keys.size() > 1 && // Several keys ... - writeThrough() && !req.skipStore() && // and store is enabled ... - !ctx.store().isLocal() && // and this is not local store ... - !ctx.dr().receiveEnabled() // and no DR. - ) { - // This method can only be used when there are no replicated entries in the batch. - UpdateBatchResult updRes = updateWithBatch(node, - hasNear, - req, - res, - locked, - ver, - dhtFut, - completionCb, - ctx.isDrEnabled(), - taskName, - expiry, - sndPrevVal); - - deleted = updRes.deleted(); - dhtFut = updRes.dhtFuture(); - - if (req.operation() == TRANSFORM) - retVal = updRes.invokeResults(); - } - else { - UpdateSingleResult updRes = updateSingle(node, - hasNear, - req, - res, - locked, - ver, - dhtFut, - completionCb, - ctx.isDrEnabled(), - taskName, - expiry, - sndPrevVal); - - retVal = updRes.returnValue(); - deleted = updRes.deleted(); - dhtFut = updRes.dhtFuture(); - } + if (req.operation() == TRANSFORM) + retVal = updRes.invokeResults(); } - finally { - if (set) - tm.setTxTopologyHint(null); + else { + UpdateSingleResult updRes = updateSingle(node, + hasNear, + req, + res, + locked, + ver, + dhtFut, + completionCb, + ctx.isDrEnabled(), + taskName, + expiry, + sndPrevVal); + + retVal = updRes.returnValue(); + deleted = updRes.deleted(); + dhtFut = updRes.dhtFuture(); } if (retVal == null) http://git-wip-us.apache.org/repos/asf/ignite/blob/185c28ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java index fa7020b..553f8cb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java @@ -52,11 +52,15 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT // Obtain the topology version to use. long threadId = Thread.currentThread().getId(); - AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId); + AffinityTopologyVersion topVer; - // If there is another system transaction in progress, use it's topology version to prevent deadlock. - if (topVer == null && tx != null && tx.system()) - topVer = cctx.tm().lockedTopologyVersion(threadId, tx); + if (tx != null && tx.system()) { + topVer = cctx.exchange().readyAffinityVersion(); + + assert topVer != null && topVer.topologyVersion() > 0 : topVer; + } + else + topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId); if (topVer != null) { tx.topologyVersion(topVer); http://git-wip-us.apache.org/repos/asf/ignite/blob/185c28ae/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java index c95c586..ddfe7fb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java @@ -25,10 +25,12 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; +import javax.cache.processor.MutableEntry; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheEntryProcessor; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; @@ -105,7 +107,7 @@ public class IgniteBinaryMetadataUpdateChangingTopologySelfTest extends GridComm IgniteCache<Object, Object> cache = ignite(0).cache("cache").withAsync(); - cache.putAll(F.asMap(key1, "val1", key2, new TestValue())); + cache.putAll(F.asMap(key1, "val1", key2, new TestValue1())); try { Thread.sleep(500); @@ -118,8 +120,47 @@ public class IgniteBinaryMetadataUpdateChangingTopologySelfTest extends GridComm } }); + Thread.sleep(1000); + + spi.stopBlock(); + + cache.future().get(); + + fut.get(); + } + finally { + stopGrid(4); + } + } + + /** + * @throws Exception If failed. + */ + public void testNoDeadlockInvoke() throws Exception { + int key1 = primaryKey(ignite(1).cache("cache")); + int key2 = primaryKey(ignite(2).cache("cache")); + + TestCommunicationSpi spi = (TestCommunicationSpi)ignite(1).configuration().getCommunicationSpi(); + + spi.blockMessages(GridNearTxPrepareResponse.class, ignite(0).cluster().localNode().id()); + + IgniteCache<Object, Object> cache = ignite(0).cache("cache").withAsync(); + + cache.invokeAll(F.asSet(key1, key2), new TestEntryProcessor()); + + try { Thread.sleep(500); + IgniteInternalFuture<Void> fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + startGrid(4); + + return null; + } + }); + + Thread.sleep(1000); + spi.stopBlock(); cache.future().get(); @@ -145,12 +186,6 @@ public class IgniteBinaryMetadataUpdateChangingTopologySelfTest extends GridComm /** */ private Map<Class<?>, Set<UUID>> blockCls = new HashMap<>(); - /** */ - private Class<?> recordCls; - - /** */ - private List<Object> recordedMsgs = new ArrayList<>(); - /** {@inheritDoc} */ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException { @@ -158,9 +193,6 @@ public class IgniteBinaryMetadataUpdateChangingTopologySelfTest extends GridComm Object msg0 = ((GridIoMessage)msg).message(); synchronized (this) { - if (recordCls != null && msg0.getClass().equals(recordCls)) - recordedMsgs.add(msg0); - Set<UUID> blockNodes = blockCls.get(msg0.getClass()); if (F.contains(blockNodes, node.id())) { @@ -178,28 +210,6 @@ public class IgniteBinaryMetadataUpdateChangingTopologySelfTest extends GridComm } /** - * @param recordCls Message class to record. - */ - void record(@Nullable Class<?> recordCls) { - synchronized (this) { - this.recordCls = recordCls; - } - } - - /** - * @return Recorded messages. - */ - List<Object> recordedMessages() { - synchronized (this) { - List<Object> msgs = recordedMsgs; - - recordedMsgs = new ArrayList<>(); - - return msgs; - } - } - - /** * @param cls Message class. * @param nodeId Node ID. */ @@ -241,7 +251,27 @@ public class IgniteBinaryMetadataUpdateChangingTopologySelfTest extends GridComm /** * */ - private static class TestValue { + static class TestEntryProcessor implements CacheEntryProcessor<Object, Object, Object> { + /** {@inheritDoc} */ + @Override public Object process(MutableEntry<Object, Object> e, Object... arguments) { + e.setValue(new TestValue2()); + + return null; + } + } + + /** + * + */ + private static class TestValue1 { + /** Field1. */ + private String field1; + } + + /** + * + */ + private static class TestValue2 { /** Field1. */ private String field1; }