ignite-1.5 Fixed hang on metadata update inside put in atomic cache when topology read lock is held. Also fixed several test issues.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/53ec76ff Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/53ec76ff Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/53ec76ff Branch: refs/heads/ignite-1537 Commit: 53ec76ffe65d5788fc1ffa32c2fba66222e51dcc Parents: 66b33bc Author: sboikov <sboi...@gridgain.com> Authored: Wed Dec 23 15:06:48 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Dec 23 15:06:48 2015 +0300 ---------------------------------------------------------------------- .../managers/communication/GridIoManager.java | 39 +- .../processors/cache/GridCacheAdapter.java | 47 ++- .../processors/cache/GridCacheProxyImpl.java | 29 ++ .../cache/GridCacheSharedContext.java | 10 +- .../processors/cache/IgniteCacheProxy.java | 35 ++ .../processors/cache/IgniteInternalCache.java | 26 ++ .../binary/CacheObjectBinaryProcessorImpl.java | 4 +- .../distributed/dht/GridDhtLockFuture.java | 2 +- .../dht/atomic/GridDhtAtomicCache.java | 95 +++-- .../dht/atomic/GridNearAtomicUpdateFuture.java | 150 ++++--- .../colocated/GridDhtColocatedLockFuture.java | 11 +- .../distributed/near/GridNearLockFuture.java | 11 +- ...arOptimisticSerializableTxPrepareFuture.java | 5 +- .../near/GridNearOptimisticTxPrepareFuture.java | 5 +- ...ridNearOptimisticTxPrepareFutureAdapter.java | 12 +- .../transactions/IgniteTxLocalAdapter.java | 2 + .../cache/transactions/IgniteTxManager.java | 61 ++- .../datastreamer/DataStreamProcessor.java | 12 +- .../ignite/internal/util/lang/GridFunc.java | 1 + .../test/config/websession/example-cache.xml | 9 +- ...niteClientReconnectFailoverAbstractTest.java | 3 +- .../cache/GridCacheAbstractFullApiSelfTest.java | 22 +- .../cache/GridCacheAbstractSelfTest.java | 3 +- ...yMetadataUpdateChangingTopologySelfTest.java | 7 +- ...niteBinaryMetadataUpdateNodeRestartTest.java | 411 +++++++++++++++++++ .../distributed/IgniteCacheManyClientsTest.java | 2 + ...ContinuousQueryFailoverAbstractSelfTest.java | 128 +++--- ...ridCacheContinuousQueryAbstractSelfTest.java | 3 + .../service/ClosureServiceClientsNodesTest.java | 22 +- .../GridServiceProcessorStopSelfTest.java | 21 +- ...cpCommunicationSpiMultithreadedSelfTest.java | 21 + .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 14 +- .../testframework/GridSpiTestContext.java | 18 +- .../cache/websession/WebSessionFilter.java | 82 ++-- .../cache/websession/WebSessionListener.java | 25 +- .../internal/websession/WebSessionSelfTest.java | 2 - 36 files changed, 1023 insertions(+), 327 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index bf7c7e4..42f8dae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -666,6 +666,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * * @param plc Policy. * @return Execution pool. + * @throws IgniteCheckedException If failed. */ private Executor pool(byte plc) throws IgniteCheckedException { switch (plc) { @@ -767,6 +768,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param msg Message. * @param plc Execution policy. * @param msgC Closure to call when message processing finished. + * @throws IgniteCheckedException If failed. */ private void processRegularMessage( final UUID nodeId, @@ -824,6 +826,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param msg Ordered message. * @param plc Execution policy. * @param msgC Closure to call when message processing finished ({@code null} for sync processing). + * @throws IgniteCheckedException If failed. */ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") private void processOrderedMessage( @@ -1029,7 +1032,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param ordered Ordered flag. * @param timeout Timeout. * @param skipOnTimeout Whether message can be skipped on timeout. - * @param ackClosure Ack closure. + * @param ackC Ack closure. * @throws IgniteCheckedException Thrown in case of any errors. */ private void send( @@ -1041,7 +1044,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa boolean ordered, long timeout, boolean skipOnTimeout, - IgniteInClosure<IgniteException> ackClosure + IgniteInClosure<IgniteException> ackC ) throws IgniteCheckedException { assert node != null; assert topic != null; @@ -1062,8 +1065,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa else processRegularMessage0(ioMsg, locNodeId); - if (ackClosure != null) - ackClosure.apply(null); + if (ackC != null) + ackC.apply(null); } else { if (topicOrd < 0) @@ -1071,7 +1074,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa try { if ((CommunicationSpi)getSpi() instanceof TcpCommunicationSpi) - ((TcpCommunicationSpi)(CommunicationSpi)getSpi()).sendMessage(node, ioMsg, ackClosure); + ((TcpCommunicationSpi)(CommunicationSpi)getSpi()).sendMessage(node, ioMsg, ackC); else getSpi().sendMessage(node, ioMsg); } @@ -1197,12 +1200,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param topic Topic to send the message to. * @param msg Message to send. * @param plc Type of processing. - * @param ackClosure Ack closure. + * @param ackC Ack closure. * @throws IgniteCheckedException Thrown in case of any errors. */ public void send(ClusterNode node, GridTopic topic, Message msg, byte plc, - IgniteInClosure<IgniteException> ackClosure) throws IgniteCheckedException { - send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackClosure); + IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException { + send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackC); } /** @@ -1233,12 +1236,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param topic Topic to send the message to. * @param msg Message to send. * @param plc Type of processing. - * @param ackClosure Ack closure. + * @param ackC Ack closure. * @throws IgniteCheckedException Thrown in case of any errors. */ - public void send(ClusterNode node, Object topic, Message msg, byte plc, IgniteInClosure<IgniteException> ackClosure) + public void send(ClusterNode node, Object topic, Message msg, byte plc, IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException { - send(node, topic, -1, msg, plc, false, 0, false, ackClosure); + send(node, topic, -1, msg, plc, false, 0, false, ackC); } /** @@ -1280,7 +1283,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param plc Type of processing. * @param timeout Timeout to keep a message on receiving queue. * @param skipOnTimeout Whether message can be skipped on timeout. - * @param ackClosure Ack closure. + * @param ackC Ack closure. * @throws IgniteCheckedException Thrown in case of any errors. */ public void sendOrderedMessage( @@ -1290,11 +1293,11 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa byte plc, long timeout, boolean skipOnTimeout, - IgniteInClosure<IgniteException> ackClosure + IgniteInClosure<IgniteException> ackC ) throws IgniteCheckedException { assert timeout > 0 || skipOnTimeout; - send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackClosure); + send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC); } /** @@ -1385,6 +1388,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param topic Topic to subscribe to. * @param p Message predicate. */ + @SuppressWarnings("unchecked") public void addUserMessageListener(@Nullable final Object topic, @Nullable final IgniteBiPredicate<UUID, ?> p) { if (p != null) { try { @@ -1406,6 +1410,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param topic Topic to unsubscribe from. * @param p Message predicate. */ + @SuppressWarnings("unchecked") public void removeUserMessageListener(@Nullable Object topic, IgniteBiPredicate<UUID, ?> p) { try { removeMessageListener(TOPIC_COMM_USER, @@ -1423,7 +1428,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param plc Type of processing. * @param timeout Timeout to keep a message on receiving queue. * @param skipOnTimeout Whether message can be skipped on timeout. - * @param ackClosure Ack closure. + * @param ackC Ack closure. * @throws IgniteCheckedException Thrown in case of any errors. */ public void sendOrderedMessage( @@ -1433,7 +1438,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa byte plc, long timeout, boolean skipOnTimeout, - IgniteInClosure<IgniteException> ackClosure + IgniteInClosure<IgniteException> ackC ) throws IgniteCheckedException { assert timeout > 0 || skipOnTimeout; @@ -1442,7 +1447,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (node == null) throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId); - send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackClosure); + send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index cc4e962..5d4c386 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -2077,8 +2077,32 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ + @Nullable @Override public <T> EntryProcessorResult<T> invoke(@Nullable AffinityTopologyVersion topVer, + K key, + EntryProcessor<K, V, T> entryProcessor, + Object... args) throws IgniteCheckedException { + return invoke0(topVer, key, entryProcessor, args); + } + + /** {@inheritDoc} */ @Override public <T> EntryProcessorResult<T> invoke(final K key, final EntryProcessor<K, V, T> entryProcessor, + final Object... args) throws IgniteCheckedException { + return invoke0(null, key, entryProcessor, args); + } + + /** + * @param topVer Locked topology version. + * @param key Key. + * @param entryProcessor Entry processor. + * @param args Entry processor arguments. + * @return Invoke result. + * @throws IgniteCheckedException If failed. + */ + private <T> EntryProcessorResult<T> invoke0( + @Nullable final AffinityTopologyVersion topVer, + final K key, + final EntryProcessor<K, V, T> entryProcessor, final Object... args) throws IgniteCheckedException { A.notNull(key, "key", entryProcessor, "entryProcessor"); @@ -2089,8 +2113,15 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return syncOp(new SyncOp<EntryProcessorResult<T>>(true) { @Nullable @Override public EntryProcessorResult<T> op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - IgniteInternalFuture<GridCacheReturn> fut = - tx.invokeAsync(ctx, key, (EntryProcessor<K, V, Object>)entryProcessor, args); + assert topVer == null || tx.implicit(); + + if (topVer != null) + tx.topologyVersion(topVer); + + IgniteInternalFuture<GridCacheReturn> fut = tx.invokeAsync(ctx, + key, + (EntryProcessor<K, V, Object>)entryProcessor, + args); Map<K, EntryProcessorResult<T>> resMap = fut.get().value(); @@ -2324,16 +2355,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V }); } - /** - * Tries to put value in cache. Will fail with {@link GridCacheTryPutFailedException} - * if topology exchange is in progress. - * - * @param key Key. - * @param val value. - * @return Old value. - * @throws IgniteCheckedException In case of error. - */ - @Nullable public V tryPutIfAbsent(K key, V val) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Nullable @Override public V tryPutIfAbsent(K key, V val) throws IgniteCheckedException { // Supported only in ATOMIC cache. throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index d1d93d8..8ffd273 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -36,6 +36,7 @@ import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityProxy; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; @@ -1231,6 +1232,34 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte } /** {@inheritDoc} */ + @Nullable @Override public V tryPutIfAbsent(K key, V val) throws IgniteCheckedException { + CacheOperationContext prev = gate.enter(opCtx); + + try { + return delegate.tryPutIfAbsent(key, val); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ + @Nullable @Override public <T> EntryProcessorResult<T> invoke( + AffinityTopologyVersion topVer, + K key, + EntryProcessor<K, V, T> entryProcessor, + Object... args) throws IgniteCheckedException { + CacheOperationContext prev = gate.enter(opCtx); + + try { + return delegate.invoke(topVer, key, entryProcessor, args); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ @Override public void removeAll() throws IgniteCheckedException { CacheOperationContext prev = gate.enter(opCtx); http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index 5ed1df9..2221d3b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -91,9 +91,6 @@ public class GridCacheSharedContext<K, V> { /** Tx metrics. */ private volatile TransactionMetricsAdapter txMetrics; - /** Preloaders start future. */ - private IgniteInternalFuture<Object> preloadersStartFut; - /** Store session listeners. */ private Collection<CacheStoreSessionListener> storeSesLsnrs; @@ -578,12 +575,7 @@ public class GridCacheSharedContext<K, V> { @Nullable public AffinityTopologyVersion lockedTopologyVersion(IgniteInternalTx ignore) { long threadId = Thread.currentThread().getId(); - IgniteInternalTx tx = txMgr.anyActiveThreadTx(threadId, ignore); - - AffinityTopologyVersion topVer = null; - - if (tx != null && tx.topologyVersionSnapshot() != null) - topVer = tx.topologyVersionSnapshot(); + AffinityTopologyVersion topVer = txMgr.lockedTopologyVersion(threadId, ignore); if (topVer == null) topVer = mvccMgr.lastExplicitLockTopologyVersion(threadId); http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 271a2cf..27a7587 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -64,6 +64,7 @@ import org.apache.ignite.internal.AsyncSupportAdapter; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.query.CacheQuery; import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture; import org.apache.ignite.internal.processors.query.GridQueryProcessor; @@ -1483,6 +1484,40 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return invoke(key, (EntryProcessor<K, V, T>)entryProcessor, args); } + /** + * @param topVer Locked topology version. + * @param key Key. + * @param entryProcessor Entry processor. + * @param args Arguments. + * @return Invoke result. + */ + public <T> T invoke(@Nullable AffinityTopologyVersion topVer, + K key, + EntryProcessor<K, V, T> entryProcessor, + Object... args) { + try { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + if (isAsync()) + throw new UnsupportedOperationException(); + else { + EntryProcessorResult<T> res = delegate.invoke(topVer, key, entryProcessor, args); + + return res != null ? res.get() : null; + } + } + finally { + onLeave(gate, prev); + } + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + /** {@inheritDoc} */ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, EntryProcessor<K, V, T> entryProcessor, http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java index 186de68..433290c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java @@ -40,6 +40,7 @@ import org.apache.ignite.cache.store.CacheStore; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -1863,4 +1864,29 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> { * @throws IgniteCheckedException If failed. */ public V getTopologySafe(K key) throws IgniteCheckedException; + + /** + * Tries to put value in cache. Will fail with {@link GridCacheTryPutFailedException} + * if topology exchange is in progress. + * + * @param key Key. + * @param val value. + * @return Old value. + * @throws IgniteCheckedException In case of error. + */ + @Nullable public V tryPutIfAbsent(K key, V val) throws IgniteCheckedException; + + /** + * @param topVer Locked topology version. + * @param key Key. + * @param entryProcessor Entry processor. + * @param args Arguments. + * @return Invoke result. + * @throws IgniteCheckedException If failed. + */ + @Nullable public <T> EntryProcessorResult<T> invoke( + @Nullable AffinityTopologyVersion topVer, + K key, + EntryProcessor<K, V, T> entryProcessor, + Object... args) throws IgniteCheckedException; } http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java index b335179..7586a42 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java @@ -489,7 +489,9 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm BinaryMetadata oldMeta = metaDataCache.localPeek(key); BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(oldMeta, newMeta0); - BinaryObjectException err = metaDataCache.invoke(key, new MetadataProcessor(mergedMeta)); + AffinityTopologyVersion topVer = ctx.cache().context().lockedTopologyVersion(null); + + BinaryObjectException err = metaDataCache.invoke(topVer, key, new MetadataProcessor(mergedMeta)); if (err != null) throw err; http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index f0d2e15..98711b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -743,7 +743,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> if (tx != null) { cctx.tm().txContext(tx); - set = cctx.tm().setTxTopologyHint(tx); + set = cctx.tm().setTxTopologyHint(tx.topologyVersionSnapshot()); } try { http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 481317a..634a9ea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -77,6 +77,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSing import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx; @@ -1240,7 +1241,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { top.readLock(); try { - if (topology().stopping()) { + if (top.stopping()) { res.addFailedKeys(keys, new IgniteCheckedException("Failed to perform cache operation " + "(cache is stopped): " + name())); @@ -1289,48 +1290,59 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridCacheReturn retVal = null; - if (keys.size() > 1 && // Several keys ... - writeThrough() && !req.skipStore() && // and store is enabled ... - !ctx.store().isLocal() && // and this is not local store ... - !ctx.dr().receiveEnabled() // and no DR. - ) { - // This method can only be used when there are no replicated entries in the batch. - UpdateBatchResult updRes = updateWithBatch(node, - hasNear, - req, - res, - locked, - ver, - dhtFut, - completionCb, - ctx.isDrEnabled(), - taskName, - expiry, - sndPrevVal); + IgniteTxManager tm = ctx.tm(); - deleted = updRes.deleted(); - dhtFut = updRes.dhtFuture(); + // Needed for metadata cache transaction. + boolean set = tm.setTxTopologyHint(req.topologyVersion()); - if (req.operation() == TRANSFORM) - retVal = updRes.invokeResults(); + try { + if (keys.size() > 1 && // Several keys ... + writeThrough() && !req.skipStore() && // and store is enabled ... + !ctx.store().isLocal() && // and this is not local store ... + !ctx.dr().receiveEnabled() // and no DR. + ) { + // This method can only be used when there are no replicated entries in the batch. + UpdateBatchResult updRes = updateWithBatch(node, + hasNear, + req, + res, + locked, + ver, + dhtFut, + completionCb, + ctx.isDrEnabled(), + taskName, + expiry, + sndPrevVal); + + deleted = updRes.deleted(); + dhtFut = updRes.dhtFuture(); + + if (req.operation() == TRANSFORM) + retVal = updRes.invokeResults(); + } + else { + UpdateSingleResult updRes = updateSingle(node, + hasNear, + req, + res, + locked, + ver, + dhtFut, + completionCb, + ctx.isDrEnabled(), + taskName, + expiry, + sndPrevVal); + + retVal = updRes.returnValue(); + deleted = updRes.deleted(); + dhtFut = updRes.dhtFuture(); + } } - else { - UpdateSingleResult updRes = updateSingle(node, - hasNear, - req, - res, - locked, - ver, - dhtFut, - completionCb, - ctx.isDrEnabled(), - taskName, - expiry, - sndPrevVal); - - retVal = updRes.returnValue(); - deleted = updRes.deleted(); - dhtFut = updRes.dhtFuture(); + finally { + if (set) + tm.setTxTopologyHint(null); } if (retVal == null) @@ -2782,8 +2794,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (log.isDebugEnabled()) log.debug("Processing dht atomic update response [nodeId=" + nodeId + ", res=" + res + ']'); - GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc(). - atomicFuture(res.futureVersion()); + GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion()); if (updateFut != null) updateFut.onResult(nodeId, res); http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index eefdc73..3c86083 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.UUID; @@ -47,8 +46,8 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; @@ -288,7 +287,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> // Cannot remap. remapCnt = 1; - state.map(topVer); + state.map(topVer, null); } } @@ -415,7 +414,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> cache.topology().readUnlock(); } - state.map(topVer); + state.map(topVer, null); } /** @@ -582,7 +581,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> req = mappings != null ? mappings.get(nodeId) : null; if (req != null && req.response() == null) { - res = new GridNearAtomicUpdateResponse(cctx.cacheId(), nodeId, req.futureVersion(), + res = new GridNearAtomicUpdateResponse(cctx.cacheId(), + nodeId, + req.futureVersion(), cctx.deploymentEnabled()); ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " + @@ -603,6 +604,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> * @param res Response. * @param nodeErr {@code True} if response was created on node failure. */ + @SuppressWarnings("unchecked") void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) { GridNearAtomicUpdateRequest req; @@ -774,7 +776,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> return; } - IgniteInternalFuture<AffinityTopologyVersion> fut = cctx.affinity().affinityReadyFuture(remapTopVer); + IgniteInternalFuture<AffinityTopologyVersion> fut = + cctx.shared().exchange().affinityReadyFuture(remapTopVer); + + if (fut == null) + fut = new GridFinishedFuture<>(remapTopVer); fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) { @@ -783,7 +789,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> try { AffinityTopologyVersion topVer = fut.get(); - map(topVer); + map(topVer, remapKeys); } catch (IgniteCheckedException e) { onDone(e); @@ -819,8 +825,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> /** * @param topVer Topology version. + * @param remapKeys Keys to remap. */ - void map(AffinityTopologyVersion topVer) { + void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) { Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer); if (F.isEmpty(topNodes)) { @@ -832,68 +839,78 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> Exception err = null; GridNearAtomicUpdateRequest singleReq0 = null; - Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = null; + Map<UUID, GridNearAtomicUpdateRequest> mappings0 = null; int size = keys.size(); - synchronized (this) { - assert futVer == null : this; - assert this.topVer == AffinityTopologyVersion.ZERO : this; + GridCacheVersion futVer = cctx.versions().next(topVer); - resCnt = 0; + GridCacheVersion updVer; - this.topVer = topVer; + // Assign version on near node in CLOCK ordering mode even if fastMap is false. + if (cctx.config().getAtomicWriteOrderMode() == CLOCK) { + updVer = this.updVer; - futVer = cctx.versions().next(topVer); + if (updVer == null) { + updVer = cctx.versions().next(topVer); - if (storeFuture()) { - if (!cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this)) { - assert isDone() : GridNearAtomicUpdateFuture.this; - - return; - } + if (log.isDebugEnabled()) + log.debug("Assigned fast-map version for update on near node: " + updVer); } + } + else + updVer = null; - // Assign version on near node in CLOCK ordering mode even if fastMap is false. - if (updVer == null) - updVer = cctx.config().getAtomicWriteOrderMode() == CLOCK ? cctx.versions().next(topVer) : null; + try { + if (size == 1 && !fastMap) { + assert remapKeys == null || remapKeys.size() == 1; - if (updVer != null && log.isDebugEnabled()) - log.debug("Assigned fast-map version for update on near node: " + updVer); + singleReq0 = mapSingleUpdate(topVer, futVer, updVer); + } + else { + Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = mapUpdate(topNodes, + topVer, + futVer, + updVer, + remapKeys); + + if (pendingMappings.size() == 1) + singleReq0 = F.firstValue(pendingMappings); + else { + if (syncMode == PRIMARY_SYNC) { + mappings0 = U.newHashMap(pendingMappings.size()); - try { - if (size == 1 && !fastMap) { - assert remapKeys == null || remapKeys.size() == 1; + for (GridNearAtomicUpdateRequest req : pendingMappings.values()) { + if (req.hasPrimary()) + mappings0.put(req.nodeId(), req); + } + } + else + mappings0 = pendingMappings; - singleReq0 = singleReq = mapSingleUpdate(); + assert !mappings0.isEmpty() || size == 0 : GridNearAtomicUpdateFuture.this; } - else { - pendingMappings = mapUpdate(topNodes); + } - if (pendingMappings.size() == 1) - singleReq0 = singleReq = F.firstValue(pendingMappings); - else { - if (syncMode == PRIMARY_SYNC) { - mappings = U.newHashMap(pendingMappings.size()); + synchronized (this) { + assert this.futVer == null : this; + assert this.topVer == AffinityTopologyVersion.ZERO : this; - for (GridNearAtomicUpdateRequest req : pendingMappings.values()) { - if (req.hasPrimary()) - mappings.put(req.nodeId(), req); - } - } - else - mappings = new HashMap<>(pendingMappings); + this.topVer = topVer; + this.updVer = updVer; + this.futVer = futVer; - assert !mappings.isEmpty() || size == 0 : GridNearAtomicUpdateFuture.this; - } - } + resCnt = 0; - remapKeys = null; - } - catch (Exception e) { - err = e; + singleReq = singleReq0; + mappings = mappings0; + + this.remapKeys = null; } } + catch (Exception e) { + err = e; + } if (err != null) { onDone(err); @@ -901,16 +918,24 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> return; } + if (storeFuture()) { + if (!cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this)) { + assert isDone() : GridNearAtomicUpdateFuture.this; + + return; + } + } + // Optimize mapping for single key. if (singleReq0 != null) mapSingle(singleReq0.nodeId(), singleReq0); else { - assert pendingMappings != null; + assert mappings0 != null; if (size == 0) onDone(new GridCacheReturn(cctx, true, true, null, true)); else - doUpdate(pendingMappings); + doUpdate(mappings0); } } @@ -958,10 +983,18 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> /** * @param topNodes Cache nodes. + * @param topVer Topology version. + * @param futVer Future version. + * @param updVer Update version. + * @param remapKeys Keys to remap. * @return Mapping. * @throws Exception If failed. */ - private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes) throws Exception { + private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes, + AffinityTopologyVersion topVer, + GridCacheVersion futVer, + @Nullable GridCacheVersion updVer, + @Nullable Collection<KeyCacheObject> remapKeys) throws Exception { Iterator<?> it = null; if (vals != null) @@ -999,7 +1032,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> throw new NullPointerException("Null value."); } else if (conflictPutVals != null) { - GridCacheDrInfo conflictPutVal = conflictPutValsIt.next(); + GridCacheDrInfo conflictPutVal = conflictPutValsIt.next(); val = conflictPutVal.value(); conflictVer = conflictPutVal.version(); @@ -1082,10 +1115,15 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> } /** + * @param topVer Topology version. + * @param futVer Future version. + * @param updVer Update version. * @return Request. * @throws Exception If failed. */ - private GridNearAtomicUpdateRequest mapSingleUpdate() throws Exception { + private GridNearAtomicUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer, + GridCacheVersion futVer, + @Nullable GridCacheVersion updVer) throws Exception { Object key = F.first(keys); Object val; http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 22b329c..a5f5286 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -48,7 +48,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLock import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -596,12 +595,8 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId); // If there is another system transaction in progress, use it's topology version to prevent deadlock. - if (topVer == null && tx != null && tx.system()) { - IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(Thread.currentThread().getId(), tx); - - if (tx0 != null) - topVer = tx0.topologyVersionSnapshot(); - } + if (topVer == null && tx != null && tx.system()) + topVer = cctx.tm().lockedTopologyVersion(Thread.currentThread().getId(), tx); if (topVer != null && tx != null) tx.topologyVersion(topVer); @@ -980,7 +975,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture * @throws IgniteCheckedException If failed. */ private void proceedMapping() throws IgniteCheckedException { - boolean set = tx != null && cctx.shared().tm().setTxTopologyHint(tx); + boolean set = tx != null && cctx.shared().tm().setTxTopologyHint(tx.topologyVersionSnapshot()); try { proceedMapping0(); http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index 23e0f6b..55c5ab6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -46,7 +46,6 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCa import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactionalCacheAdapter; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -723,12 +722,8 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId); // If there is another system transaction in progress, use it's topology version to prevent deadlock. - if (topVer == null && tx != null && tx.system()) { - IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(threadId, tx); - - if (tx0 != null) - topVer = tx0.topologyVersionSnapshot(); - } + if (topVer == null && tx != null && tx.system()) + topVer = cctx.tm().lockedTopologyVersion(threadId, tx); if (topVer != null && tx != null) tx.topologyVersion(topVer); @@ -1098,7 +1093,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean * @throws IgniteCheckedException If failed. */ private void proceedMapping() throws IgniteCheckedException { - boolean set = tx != null && cctx.shared().tm().setTxTopologyHint(tx); + boolean set = tx != null && cctx.shared().tm().setTxTopologyHint(tx.topologyVersionSnapshot()); try { proceedMapping0(); http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index f52b3fc..37dc564 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -84,7 +84,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim * @param cctx Context. * @param tx Transaction. */ - public GridNearOptimisticSerializableTxPrepareFuture(GridCacheSharedContext cctx, GridNearTxLocal tx) { + public GridNearOptimisticSerializableTxPrepareFuture(GridCacheSharedContext cctx, + GridNearTxLocal tx) { super(cctx, tx); assert tx.optimistic() && tx.serializable() : tx; @@ -304,7 +305,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim return; } - boolean set = cctx.tm().setTxTopologyHint(tx); + boolean set = cctx.tm().setTxTopologyHint(tx.topologyVersionSnapshot()); try { prepare(tx.readEntries(), tx.writeEntries(), remap, topLocked); http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index 2ce14af..a9f158a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -72,7 +72,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa * @param cctx Context. * @param tx Transaction. */ - public GridNearOptimisticTxPrepareFuture(GridCacheSharedContext cctx, GridNearTxLocal tx) { + public GridNearOptimisticTxPrepareFuture(GridCacheSharedContext cctx, + GridNearTxLocal tx) { super(cctx, tx); assert tx.optimistic() && !tx.serializable() : tx; @@ -405,7 +406,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa if (isDone()) return; - boolean set = cctx.tm().setTxTopologyHint(tx); + boolean set = cctx.tm().setTxTopologyHint(tx.topologyVersionSnapshot()); try { assert !m.empty(); http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java index b3eab34..fa7020b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java @@ -23,7 +23,6 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -41,7 +40,8 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT * @param cctx Context. * @param tx Transaction. */ - public GridNearOptimisticTxPrepareFutureAdapter(GridCacheSharedContext cctx, GridNearTxLocal tx) { + public GridNearOptimisticTxPrepareFutureAdapter(GridCacheSharedContext cctx, + GridNearTxLocal tx) { super(cctx, tx); assert tx.optimistic() : tx; @@ -55,12 +55,8 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId); // If there is another system transaction in progress, use it's topology version to prevent deadlock. - if (topVer == null && tx != null && tx.system()) { - IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(threadId, tx); - - if (tx0 != null) - topVer = tx0.topologyVersionSnapshot(); - } + if (topVer == null && tx != null && tx.system()) + topVer = cctx.tm().lockedTopologyVersion(threadId, tx); if (topVer != null) { tx.topologyVersion(topVer); http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 720832e..70c79a5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -3203,8 +3203,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } /** + * @param cacheCtx Cache context. * @param loadFut Missing keys load future. * @param ret Future result. + * @param keepBinary Keep binary flag. * @return Future. */ private IgniteInternalFuture optimisticPutFuture( http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index d2b803a..d384e4e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -114,8 +114,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { /** Committing transactions. */ private final ThreadLocal<IgniteInternalTx> threadCtx = new ThreadLocal<>(); - /** Transaction which topology version should be used when mapping internal tx. */ - private final ThreadLocal<IgniteInternalTx> txTopology = new ThreadLocal<>(); + /** Topology version should be used when mapping internal tx. */ + private final ThreadLocal<AffinityTopologyVersion> txTop = new ThreadLocal<>(); /** Per-thread transaction map. */ private final ConcurrentMap<Long, IgniteInternalTx> threadMap = newMap(); @@ -130,7 +130,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { private final ConcurrentMap<GridCacheVersion, IgniteInternalTx> nearIdMap = newMap(); /** TX handler. */ - private IgniteTxHandler txHandler; + private IgniteTxHandler txHnd; /** Committed local transactions. */ private final GridBoundedConcurrentOrderedMap<GridCacheVersion, Boolean> completedVersSorted = @@ -197,7 +197,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { @Override protected void start0() throws IgniteCheckedException { txFinishSync = new GridCacheTxFinishSync<>(cctx); - txHandler = new IgniteTxHandler(cctx); + txHnd = new IgniteTxHandler(cctx); } /** {@inheritDoc} */ @@ -212,7 +212,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @return TX handler. */ public IgniteTxHandler txHandler() { - return txHandler; + return txHnd; } /** @@ -607,13 +607,17 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { /** * @param threadId Thread ID. * @param ignore Transaction to ignore. - * @return Any transaction associated with the current thread. + * @return Not null topology version if current thread holds lock preventing topology change. */ - public IgniteInternalTx anyActiveThreadTx(long threadId, IgniteInternalTx ignore) { + @Nullable public AffinityTopologyVersion lockedTopologyVersion(long threadId, IgniteInternalTx ignore) { IgniteInternalTx tx = threadMap.get(threadId); - if (tx != null && tx.topologyVersionSnapshot() != null) - return tx; + if (tx != null) { + AffinityTopologyVersion topVer = tx.topologyVersionSnapshot(); + + if (topVer != null) + return topVer; + } for (GridCacheContext cacheCtx : cctx.cache().context().cacheContexts()) { if (!cacheCtx.systemTx()) @@ -621,22 +625,27 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { tx = sysThreadMap.get(new TxThreadKey(threadId, cacheCtx.cacheId())); - if (tx != null && tx != ignore && tx.topologyVersionSnapshot() != null) - return tx; + if (tx != null && tx != ignore) { + AffinityTopologyVersion topVer = tx.topologyVersionSnapshot(); + + if (topVer != null) + return topVer; + } } - return txTopology.get(); + return txTop.get(); } /** - * @param tx Transaction. + * @param topVer Locked topology version. + * @return {@code True} if topology hint was set. */ - public boolean setTxTopologyHint(IgniteInternalTx tx) { - if (tx == null) - txTopology.remove(); + public boolean setTxTopologyHint(@Nullable AffinityTopologyVersion topVer) { + if (topVer == null) + txTop.remove(); else { - if (txTopology.get() == null) { - txTopology.set(tx); + if (txTop.get() == null) { + txTop.set(topVer); return true; } @@ -1807,8 +1816,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { this.evtNodeId = evtNodeId; } - /** {@inheritDoc} */ - @Override public void onTimeout() { + /** + * + */ + private void onTimeout0() { try { cctx.kernalContext().gateway().readLock(); } @@ -1861,6 +1872,16 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { cctx.kernalContext().gateway().readUnlock(); } } + + /** {@inheritDoc} */ + @Override public void onTimeout() { + // Should not block timeout thread. + cctx.kernalContext().closure().runLocalSafe(new Runnable() { + @Override public void run() { + onTimeout0(); + } + }); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java index a2aab77..da39209 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java @@ -62,6 +62,9 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter { /** Marshaller. */ private final Marshaller marsh; + /** */ + private byte[] marshErrBytes; + /** * @param ctx Kernal context. */ @@ -86,6 +89,9 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter { if (ctx.config().isDaemon()) return; + marshErrBytes = marsh.marshal(new IgniteCheckedException("Failed to marshal response error, " + + "see node log for details.")); + flusher = new IgniteThread(new GridWorker(ctx.gridName(), "grid-data-loader-flusher", log) { @Override protected void body() throws InterruptedException { while (!isCancelled()) { @@ -324,10 +330,10 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter { try { errBytes = err != null ? marsh.marshal(err) : null; } - catch (IgniteCheckedException e) { - U.error(log, "Failed to marshal message.", e); + catch (Exception e) { + U.error(log, "Failed to marshal error [err=" + err + ", marshErr=" + e + ']', e); - return; + errBytes = marshErrBytes; } DataStreamerResponse res = new DataStreamerResponse(reqId, errBytes, forceLocDep); http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java index 8d5a8e7..8eeca6b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java @@ -3408,6 +3408,7 @@ public class GridFunc { * @return First element in given collection for which predicate evaluates to * {@code true} - or {@code null} if such element cannot be found. */ + @SafeVarargs @Nullable public static <V> V find(Iterable<? extends V> c, @Nullable V dfltVal, @Nullable IgnitePredicate<? super V>... p) { A.notNull(c, "c"); http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/config/websession/example-cache.xml ---------------------------------------------------------------------- diff --git a/modules/core/src/test/config/websession/example-cache.xml b/modules/core/src/test/config/websession/example-cache.xml index d5bfeb7..0cc0e1e 100644 --- a/modules/core/src/test/config/websession/example-cache.xml +++ b/modules/core/src/test/config/websession/example-cache.xml @@ -130,14 +130,7 @@ <property name="discoverySpi"> <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> <property name="ipFinder"> - <!-- - Ignite provides several options for automatic discovery that can be used - instead os static IP based discovery. For information on all options refer - to our documentation: http://apacheignite.readme.io/docs/cluster-config - --> - <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. --> - <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">--> - <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> + <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder"> <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java index f050c72..7e217b7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java @@ -117,7 +117,8 @@ public abstract class IgniteClientReconnectFailoverAbstractTest extends IgniteCl } return null; - } catch (Throwable e) { + } + catch (Throwable e) { log.error("Unexpected error in operation thread: " + e, e); stop.set(true); http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java index 5b294cc..0d9c541 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@ -3277,9 +3277,9 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract * @throws Exception If failed. */ public void testPeekExpired() throws Exception { - IgniteCache<String, Integer> c = jcache(); + final IgniteCache<String, Integer> c = jcache(); - String key = primaryKeysForCache(c, 1).get(0); + final String key = primaryKeysForCache(c, 1).get(0); info("Using key: " + key); @@ -3295,6 +3295,12 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract Thread.sleep(ttl + 100); + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return peek(c, key) == null; + } + }, 2000); + assert peek(c, key) == null; assert c.localSize() == 0 : "Cache is not empty."; @@ -3307,9 +3313,9 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract */ public void testPeekExpiredTx() throws Exception { if (txShouldBeUsed()) { - IgniteCache<String, Integer> c = jcache(); + final IgniteCache<String, Integer> c = jcache(); - String key = "1"; + final String key = "1"; int ttl = 500; try (Transaction tx = grid(0).transactions().txStart()) { @@ -3320,9 +3326,13 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract tx.commit(); } - Thread.sleep(ttl + 100); + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return peek(c, key) == null; + } + }, 2000); - assertNull(c.localPeek(key, ONHEAP)); + assertNull(peek(c, key)); assert c.localSize() == 0; } http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java index 52fbf4c..b3d1384 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java @@ -416,9 +416,8 @@ public abstract class GridCacheAbstractSelfTest extends GridCommonAbstractTest { * @param cache Cache projection. * @param key Key. * @return Value. - * @throws Exception If failed. */ - @Nullable protected <K, V> V peek(IgniteCache<K, V> cache, K key) throws Exception { + @Nullable protected <K, V> V peek(IgniteCache<K, V> cache, K key) { return offheapTiered(cache) ? cache.localPeek(key, CachePeekMode.SWAP, CachePeekMode.OFFHEAP) : cache.localPeek(key, CachePeekMode.ONHEAP); } http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java index e53650c..c95c586 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java @@ -152,7 +152,7 @@ public class IgniteBinaryMetadataUpdateChangingTopologySelfTest extends GridComm private List<Object> recordedMsgs = new ArrayList<>(); /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException { if (msg instanceof GridIoMessage) { Object msg0 = ((GridIoMessage)msg).message(); @@ -174,7 +174,7 @@ public class IgniteBinaryMetadataUpdateChangingTopologySelfTest extends GridComm } } - super.sendMessage(node, msg, ackClosure); + super.sendMessage(node, msg, ackC); } /** @@ -238,6 +238,9 @@ public class IgniteBinaryMetadataUpdateChangingTopologySelfTest extends GridComm } } + /** + * + */ private static class TestValue { /** Field1. */ private String field1; http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java new file mode 100644 index 0000000..814fb08 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.util.concurrent.Callable; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.CacheException; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.cluster.ClusterTopologyException; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class IgniteBinaryMetadataUpdateNodeRestartTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final String ATOMIC_CACHE = "atomicCache"; + + /** */ + private static final String TX_CACHE = "txCache"; + + /** */ + private static final int SRVS = 3; + + /** */ + private static final int CLIENTS = 1; + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + + cfg.setMarshaller(null); + + CacheConfiguration ccfg1 = cacheConfiguration(TX_CACHE, TRANSACTIONAL); + CacheConfiguration ccfg2 = cacheConfiguration(ATOMIC_CACHE, ATOMIC); + + cfg.setCacheConfiguration(ccfg1, ccfg2); + + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * @param name Cache name. + * @param atomicityMode Cache atomicity mode. + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration(String name, CacheAtomicityMode atomicityMode) { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setName(name); + ccfg.setAtomicityMode(atomicityMode); + + return ccfg; + } + + /** + * @throws Exception If failed. + */ + public void testNodeRestart() throws Exception { + for (int i = 0; i < 10; i++) { + log.info("Iteration: " + i); + + client = false; + + startGridsMultiThreaded(SRVS); + + client = true; + + startGrid(SRVS); + + final AtomicBoolean stop = new AtomicBoolean(); + + try { + IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + while (!stop.get()) { + log.info("Start node."); + + startGrid(SRVS + CLIENTS); + + log.info("Stop node."); + + stopGrid(SRVS + CLIENTS); + } + + return null; + } + }, "restart-thread"); + + final AtomicInteger idx = new AtomicInteger(); + + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + int threadIdx = idx.getAndIncrement(); + + int node = threadIdx % (SRVS + CLIENTS); + + Ignite ignite = ignite(node); + + log.info("Started thread: " + ignite.name()); + + Thread.currentThread().setName("update-thread-" + threadIdx + "-" + ignite.name()); + + IgniteCache<Object, Object> cache1 = ignite.cache(ATOMIC_CACHE); + IgniteCache<Object, Object> cache2 = ignite.cache(TX_CACHE); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while (!stop.get()) { + try { + cache1.put(new TestClass1(true), create(rnd.nextInt(20) + 1)); + + cache1.invoke(new TestClass1(true), new TestEntryProcessor(rnd.nextInt(20) + 1)); + + cache2.put(new TestClass1(true), create(rnd.nextInt(20) + 1)); + + cache2.invoke(new TestClass1(true), new TestEntryProcessor(rnd.nextInt(20) + 1)); + } + catch (CacheException | IgniteException e) { + log.info("Error: " + e); + + if (X.hasCause(e, ClusterTopologyException.class)) { + ClusterTopologyException cause = X.cause(e, ClusterTopologyException.class); + + if (cause.retryReadyFuture() != null) + cause.retryReadyFuture().get(); + } + } + } + + return null; + } + }, 10, "update-thread"); + + U.sleep(5_000); + + stop.set(true); + + restartFut.get(); + + fut.get(); + } + finally { + stop.set(true); + + stopAllGrids(); + } + } + } + + /** + * @param id Class ID. + * @return Test class instance. + */ + private static Object create(int id) { + switch (id) { + case 1: return new TestClass1(true); + + case 2: return new TestClass2(); + + case 3: return new TestClass3(); + + case 4: return new TestClass4(); + + case 5: return new TestClass5(); + + case 6: return new TestClass6(); + + case 7: return new TestClass7(); + + case 8: return new TestClass8(); + + case 9: return new TestClass9(); + + case 10: return new TestClass10(); + + case 11: return new TestClass11(); + + case 12: return new TestClass12(); + + case 13: return new TestClass13(); + + case 14: return new TestClass14(); + + case 15: return new TestClass15(); + + case 16: return new TestClass16(); + + case 17: return new TestClass17(); + + case 18: return new TestClass18(); + + case 19: return new TestClass19(); + + case 20: return new TestClass20(); + } + + fail(); + + return null; + } + + /** + * + */ + static class TestEntryProcessor implements CacheEntryProcessor<Object, Object, Object> { + /** */ + private int id; + + /** + * @param id Value id. + */ + public TestEntryProcessor(int id) { + this.id = id; + } + + /** {@inheritDoc} */ + @Override public Object process(MutableEntry<Object, Object> entry, Object... args) { + entry.setValue(create(id)); + + return null; + } + } + + /** + * + */ + static class TestClass1 { + /** */ + int val; + + /** + * @param setVal Set value flag. + */ + public TestClass1(boolean setVal) { + this.val = setVal ? ThreadLocalRandom.current().nextInt(10_000) : 0; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) return true; + + if (o == null || getClass() != o.getClass()) + return false; + + TestClass1 that = (TestClass1)o; + + return val == that.val; + + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return val; + } + } + + /** + * + */ + static class TestClass2 {} + + /** + * + */ + static class TestClass3 {} + + /** + * + */ + static class TestClass4 {} + + /** + * + */ + static class TestClass5 {} + + /** + * + */ + static class TestClass6 {} + + /** + * + */ + static class TestClass7 {} + + /** + * + */ + static class TestClass8 {} + + /** + * + */ + static class TestClass9 {} + + /** + * + */ + static class TestClass10 {} + + /** + * + */ + static class TestClass11 {} + + /** + * + */ + static class TestClass12 {} + + /** + * + */ + static class TestClass13 {} + + /** + * + */ + static class TestClass14 {} + + /** + * + */ + static class TestClass15 {} + + /** + * + */ + static class TestClass16 {} + + /** + * + */ + static class TestClass17 {} + + /** + * + */ + static class TestClass18 {} + + /** + * + */ + static class TestClass19 {} + + /** + * + */ + static class TestClass20 {} +} http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java index 242b12d..8d4af19 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java @@ -65,6 +65,8 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest { @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); + cfg.setFailureDetectionTimeout(20_000); + cfg.setConnectorConfiguration(null); cfg.setPeerClassLoadingEnabled(false); cfg.setTimeServerPortRange(200);