ignite-1.5 Fixed hang on metadata update inside put in atomic cache when 
topology read lock is held. Also fixed several test issues.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/53ec76ff
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/53ec76ff
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/53ec76ff

Branch: refs/heads/ignite-1537
Commit: 53ec76ffe65d5788fc1ffa32c2fba66222e51dcc
Parents: 66b33bc
Author: sboikov <sboi...@gridgain.com>
Authored: Wed Dec 23 15:06:48 2015 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Wed Dec 23 15:06:48 2015 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   |  39 +-
 .../processors/cache/GridCacheAdapter.java      |  47 ++-
 .../processors/cache/GridCacheProxyImpl.java    |  29 ++
 .../cache/GridCacheSharedContext.java           |  10 +-
 .../processors/cache/IgniteCacheProxy.java      |  35 ++
 .../processors/cache/IgniteInternalCache.java   |  26 ++
 .../binary/CacheObjectBinaryProcessorImpl.java  |   4 +-
 .../distributed/dht/GridDhtLockFuture.java      |   2 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  95 +++--
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 150 ++++---
 .../colocated/GridDhtColocatedLockFuture.java   |  11 +-
 .../distributed/near/GridNearLockFuture.java    |  11 +-
 ...arOptimisticSerializableTxPrepareFuture.java |   5 +-
 .../near/GridNearOptimisticTxPrepareFuture.java |   5 +-
 ...ridNearOptimisticTxPrepareFutureAdapter.java |  12 +-
 .../transactions/IgniteTxLocalAdapter.java      |   2 +
 .../cache/transactions/IgniteTxManager.java     |  61 ++-
 .../datastreamer/DataStreamProcessor.java       |  12 +-
 .../ignite/internal/util/lang/GridFunc.java     |   1 +
 .../test/config/websession/example-cache.xml    |   9 +-
 ...niteClientReconnectFailoverAbstractTest.java |   3 +-
 .../cache/GridCacheAbstractFullApiSelfTest.java |  22 +-
 .../cache/GridCacheAbstractSelfTest.java        |   3 +-
 ...yMetadataUpdateChangingTopologySelfTest.java |   7 +-
 ...niteBinaryMetadataUpdateNodeRestartTest.java | 411 +++++++++++++++++++
 .../distributed/IgniteCacheManyClientsTest.java |   2 +
 ...ContinuousQueryFailoverAbstractSelfTest.java | 128 +++---
 ...ridCacheContinuousQueryAbstractSelfTest.java |   3 +
 .../service/ClosureServiceClientsNodesTest.java |  22 +-
 .../GridServiceProcessorStopSelfTest.java       |  21 +-
 ...cpCommunicationSpiMultithreadedSelfTest.java |  21 +
 .../spi/discovery/tcp/TcpDiscoverySelfTest.java |  14 +-
 .../testframework/GridSpiTestContext.java       |  18 +-
 .../cache/websession/WebSessionFilter.java      |  82 ++--
 .../cache/websession/WebSessionListener.java    |  25 +-
 .../internal/websession/WebSessionSelfTest.java |   2 -
 36 files changed, 1023 insertions(+), 327 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index bf7c7e4..42f8dae 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -666,6 +666,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
      *
      * @param plc Policy.
      * @return Execution pool.
+     * @throws IgniteCheckedException If failed.
      */
     private Executor pool(byte plc) throws IgniteCheckedException {
         switch (plc) {
@@ -767,6 +768,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
      * @param msg Message.
      * @param plc Execution policy.
      * @param msgC Closure to call when message processing finished.
+     * @throws IgniteCheckedException If failed.
      */
     private void processRegularMessage(
         final UUID nodeId,
@@ -824,6 +826,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
      * @param msg Ordered message.
      * @param plc Execution policy.
      * @param msgC Closure to call when message processing finished ({@code 
null} for sync processing).
+     * @throws IgniteCheckedException If failed.
      */
     @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
     private void processOrderedMessage(
@@ -1029,7 +1032,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
      * @param ordered Ordered flag.
      * @param timeout Timeout.
      * @param skipOnTimeout Whether message can be skipped on timeout.
-     * @param ackClosure Ack closure.
+     * @param ackC Ack closure.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
     private void send(
@@ -1041,7 +1044,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
         boolean ordered,
         long timeout,
         boolean skipOnTimeout,
-        IgniteInClosure<IgniteException> ackClosure
+        IgniteInClosure<IgniteException> ackC
     ) throws IgniteCheckedException {
         assert node != null;
         assert topic != null;
@@ -1062,8 +1065,8 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
             else
                 processRegularMessage0(ioMsg, locNodeId);
 
-            if (ackClosure != null)
-                ackClosure.apply(null);
+            if (ackC != null)
+                ackC.apply(null);
         }
         else {
             if (topicOrd < 0)
@@ -1071,7 +1074,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
 
             try {
                 if ((CommunicationSpi)getSpi() instanceof TcpCommunicationSpi)
-                    
((TcpCommunicationSpi)(CommunicationSpi)getSpi()).sendMessage(node, ioMsg, 
ackClosure);
+                    
((TcpCommunicationSpi)(CommunicationSpi)getSpi()).sendMessage(node, ioMsg, 
ackC);
                 else
                     getSpi().sendMessage(node, ioMsg);
             }
@@ -1197,12 +1200,12 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
      * @param topic Topic to send the message to.
      * @param msg Message to send.
      * @param plc Type of processing.
-     * @param ackClosure Ack closure.
+     * @param ackC Ack closure.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
     public void send(ClusterNode node, GridTopic topic, Message msg, byte plc,
-        IgniteInClosure<IgniteException> ackClosure) throws 
IgniteCheckedException {
-        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, 
ackClosure);
+        IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
+        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackC);
     }
 
     /**
@@ -1233,12 +1236,12 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
      * @param topic Topic to send the message to.
      * @param msg Message to send.
      * @param plc Type of processing.
-     * @param ackClosure Ack closure.
+     * @param ackC Ack closure.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    public void send(ClusterNode node, Object topic, Message msg, byte plc, 
IgniteInClosure<IgniteException> ackClosure)
+    public void send(ClusterNode node, Object topic, Message msg, byte plc, 
IgniteInClosure<IgniteException> ackC)
         throws IgniteCheckedException {
-        send(node, topic, -1, msg, plc, false, 0, false, ackClosure);
+        send(node, topic, -1, msg, plc, false, 0, false, ackC);
     }
 
     /**
@@ -1280,7 +1283,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
      * @param plc Type of processing.
      * @param timeout Timeout to keep a message on receiving queue.
      * @param skipOnTimeout Whether message can be skipped on timeout.
-     * @param ackClosure Ack closure.
+     * @param ackC Ack closure.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
     public void sendOrderedMessage(
@@ -1290,11 +1293,11 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
         byte plc,
         long timeout,
         boolean skipOnTimeout,
-        IgniteInClosure<IgniteException> ackClosure
+        IgniteInClosure<IgniteException> ackC
     ) throws IgniteCheckedException {
         assert timeout > 0 || skipOnTimeout;
 
-        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, 
ackClosure);
+        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, 
ackC);
     }
 
      /**
@@ -1385,6 +1388,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
      * @param topic Topic to subscribe to.
      * @param p Message predicate.
      */
+    @SuppressWarnings("unchecked")
     public void addUserMessageListener(@Nullable final Object topic, @Nullable 
final IgniteBiPredicate<UUID, ?> p) {
         if (p != null) {
             try {
@@ -1406,6 +1410,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
      * @param topic Topic to unsubscribe from.
      * @param p Message predicate.
      */
+    @SuppressWarnings("unchecked")
     public void removeUserMessageListener(@Nullable Object topic, 
IgniteBiPredicate<UUID, ?> p) {
         try {
             removeMessageListener(TOPIC_COMM_USER,
@@ -1423,7 +1428,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
      * @param plc Type of processing.
      * @param timeout Timeout to keep a message on receiving queue.
      * @param skipOnTimeout Whether message can be skipped on timeout.
-     * @param ackClosure Ack closure.
+     * @param ackC Ack closure.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
     public void sendOrderedMessage(
@@ -1433,7 +1438,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
         byte plc,
         long timeout,
         boolean skipOnTimeout,
-        IgniteInClosure<IgniteException> ackClosure
+        IgniteInClosure<IgniteException> ackC
     ) throws IgniteCheckedException {
         assert timeout > 0 || skipOnTimeout;
 
@@ -1442,7 +1447,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
         if (node == null)
             throw new IgniteCheckedException("Failed to send message to node 
(has node left grid?): " + nodeId);
 
-        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, 
ackClosure);
+        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, 
ackC);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index cc4e962..5d4c386 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -2077,8 +2077,32 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public <T> EntryProcessorResult<T> invoke(@Nullable 
AffinityTopologyVersion topVer,
+        K key,
+        EntryProcessor<K, V, T> entryProcessor,
+        Object... args) throws IgniteCheckedException {
+        return invoke0(topVer, key, entryProcessor, args);
+    }
+
+    /** {@inheritDoc} */
     @Override public <T> EntryProcessorResult<T> invoke(final K key,
         final EntryProcessor<K, V, T> entryProcessor,
+        final Object... args) throws IgniteCheckedException {
+        return invoke0(null, key, entryProcessor, args);
+    }
+
+    /**
+     * @param topVer Locked topology version.
+     * @param key Key.
+     * @param entryProcessor Entry processor.
+     * @param args Entry processor arguments.
+     * @return Invoke result.
+     * @throws IgniteCheckedException If failed.
+     */
+    private <T> EntryProcessorResult<T> invoke0(
+        @Nullable final AffinityTopologyVersion topVer,
+        final K key,
+        final EntryProcessor<K, V, T> entryProcessor,
         final Object... args)
         throws IgniteCheckedException {
         A.notNull(key, "key", entryProcessor, "entryProcessor");
@@ -2089,8 +2113,15 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         return syncOp(new SyncOp<EntryProcessorResult<T>>(true) {
             @Nullable @Override public EntryProcessorResult<T> 
op(IgniteTxLocalAdapter tx)
                 throws IgniteCheckedException {
-                IgniteInternalFuture<GridCacheReturn> fut =
-                    tx.invokeAsync(ctx, key, (EntryProcessor<K, V, 
Object>)entryProcessor, args);
+                assert topVer == null || tx.implicit();
+
+                if (topVer != null)
+                    tx.topologyVersion(topVer);
+
+                IgniteInternalFuture<GridCacheReturn> fut = tx.invokeAsync(ctx,
+                    key,
+                    (EntryProcessor<K, V, Object>)entryProcessor,
+                    args);
 
                 Map<K, EntryProcessorResult<T>> resMap = fut.get().value();
 
@@ -2324,16 +2355,8 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         });
     }
 
-    /**
-     * Tries to put value in cache. Will fail with {@link 
GridCacheTryPutFailedException}
-     * if topology exchange is in progress.
-     *
-     * @param key Key.
-     * @param val value.
-     * @return Old value.
-     * @throws IgniteCheckedException In case of error.
-     */
-    @Nullable public V tryPutIfAbsent(K key, V val) throws 
IgniteCheckedException {
+    /** {@inheritDoc} */
+    @Nullable @Override public V tryPutIfAbsent(K key, V val) throws 
IgniteCheckedException {
         // Supported only in ATOMIC cache.
         throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index d1d93d8..8ffd273 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -36,6 +36,7 @@ import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityProxy;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
@@ -1231,6 +1232,34 @@ public class GridCacheProxyImpl<K, V> implements 
IgniteInternalCache<K, V>, Exte
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public V tryPutIfAbsent(K key, V val) throws 
IgniteCheckedException {
+        CacheOperationContext prev = gate.enter(opCtx);
+
+        try {
+            return delegate.tryPutIfAbsent(key, val);
+        }
+        finally {
+            gate.leave(prev);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public <T> EntryProcessorResult<T> invoke(
+        AffinityTopologyVersion topVer,
+        K key,
+        EntryProcessor<K, V, T> entryProcessor,
+        Object... args) throws IgniteCheckedException {
+        CacheOperationContext prev = gate.enter(opCtx);
+
+        try {
+            return delegate.invoke(topVer, key, entryProcessor, args);
+        }
+        finally {
+            gate.leave(prev);
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public void removeAll()
         throws IgniteCheckedException {
         CacheOperationContext prev = gate.enter(opCtx);

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 5ed1df9..2221d3b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -91,9 +91,6 @@ public class GridCacheSharedContext<K, V> {
     /** Tx metrics. */
     private volatile TransactionMetricsAdapter txMetrics;
 
-    /** Preloaders start future. */
-    private IgniteInternalFuture<Object> preloadersStartFut;
-
     /** Store session listeners. */
     private Collection<CacheStoreSessionListener> storeSesLsnrs;
 
@@ -578,12 +575,7 @@ public class GridCacheSharedContext<K, V> {
     @Nullable public AffinityTopologyVersion 
lockedTopologyVersion(IgniteInternalTx ignore) {
         long threadId = Thread.currentThread().getId();
 
-        IgniteInternalTx tx = txMgr.anyActiveThreadTx(threadId, ignore);
-
-        AffinityTopologyVersion topVer = null;
-
-        if (tx != null && tx.topologyVersionSnapshot() != null)
-            topVer = tx.topologyVersionSnapshot();
+        AffinityTopologyVersion topVer = txMgr.lockedTopologyVersion(threadId, 
ignore);
 
         if (topVer == null)
             topVer = mvccMgr.lastExplicitLockTopologyVersion(threadId);

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 271a2cf..27a7587 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -64,6 +64,7 @@ import org.apache.ignite.internal.AsyncSupportAdapter;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.query.CacheQuery;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
@@ -1483,6 +1484,40 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
         return invoke(key, (EntryProcessor<K, V, T>)entryProcessor, args);
     }
 
+    /**
+     * @param topVer Locked topology version.
+     * @param key Key.
+     * @param entryProcessor Entry processor.
+     * @param args Arguments.
+     * @return Invoke result.
+     */
+    public <T> T invoke(@Nullable AffinityTopologyVersion topVer,
+        K key,
+        EntryProcessor<K, V, T> entryProcessor,
+        Object... args) {
+        try {
+            GridCacheGateway<K, V> gate = this.gate;
+
+            CacheOperationContext prev = onEnter(gate, opCtx);
+
+            try {
+                if (isAsync())
+                    throw new UnsupportedOperationException();
+                else {
+                    EntryProcessorResult<T> res = delegate.invoke(topVer, key, 
entryProcessor, args);
+
+                    return res != null ? res.get() : null;
+                }
+            }
+            finally {
+                onLeave(gate, prev);
+            }
+        }
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? 
extends K> keys,
         EntryProcessor<K, V, T> entryProcessor,

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
index 186de68..433290c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
@@ -40,6 +40,7 @@ import org.apache.ignite.cache.store.CacheStore;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -1863,4 +1864,29 @@ public interface IgniteInternalCache<K, V> extends 
Iterable<Cache.Entry<K, V>> {
      * @throws IgniteCheckedException If failed.
      */
     public V getTopologySafe(K key) throws IgniteCheckedException;
+
+    /**
+     * Tries to put value in cache. Will fail with {@link 
GridCacheTryPutFailedException}
+     * if topology exchange is in progress.
+     *
+     * @param key Key.
+     * @param val value.
+     * @return Old value.
+     * @throws IgniteCheckedException In case of error.
+     */
+    @Nullable public V tryPutIfAbsent(K key, V val) throws 
IgniteCheckedException;
+
+    /**
+     * @param topVer Locked topology version.
+     * @param key Key.
+     * @param entryProcessor Entry processor.
+     * @param args Arguments.
+     * @return Invoke result.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable public <T> EntryProcessorResult<T> invoke(
+        @Nullable AffinityTopologyVersion topVer,
+        K key,
+        EntryProcessor<K, V, T> entryProcessor,
+        Object... args) throws IgniteCheckedException;
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index b335179..7586a42 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -489,7 +489,9 @@ public class CacheObjectBinaryProcessorImpl extends 
IgniteCacheObjectProcessorIm
             BinaryMetadata oldMeta = metaDataCache.localPeek(key);
             BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(oldMeta, 
newMeta0);
 
-            BinaryObjectException err = metaDataCache.invoke(key, new 
MetadataProcessor(mergedMeta));
+            AffinityTopologyVersion topVer = 
ctx.cache().context().lockedTopologyVersion(null);
+
+            BinaryObjectException err = metaDataCache.invoke(topVer, key, new 
MetadataProcessor(mergedMeta));
 
             if (err != null)
                 throw err;

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index f0d2e15..98711b8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -743,7 +743,7 @@ public final class GridDhtLockFuture extends 
GridCompoundIdentityFuture<Boolean>
         if (tx != null) {
             cctx.tm().txContext(tx);
 
-            set = cctx.tm().setTxTopologyHint(tx);
+            set = cctx.tm().setTxTopologyHint(tx.topologyVersionSnapshot());
         }
 
         try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 481317a..634a9ea 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -77,6 +77,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearSing
 import 
org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
+import 
org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
@@ -1240,7 +1241,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                 top.readLock();
 
                 try {
-                    if (topology().stopping()) {
+                    if (top.stopping()) {
                         res.addFailedKeys(keys, new 
IgniteCheckedException("Failed to perform cache operation " +
                             "(cache is stopped): " + name()));
 
@@ -1289,48 +1290,59 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
                         GridCacheReturn retVal = null;
 
-                        if (keys.size() > 1 &&                             // 
Several keys ...
-                            writeThrough() && !req.skipStore() &&          // 
and store is enabled ...
-                            !ctx.store().isLocal() &&                      // 
and this is not local store ...
-                            !ctx.dr().receiveEnabled()                     // 
and no DR.
-                        ) {
-                            // This method can only be used when there are no 
replicated entries in the batch.
-                            UpdateBatchResult updRes = updateWithBatch(node,
-                                hasNear,
-                                req,
-                                res,
-                                locked,
-                                ver,
-                                dhtFut,
-                                completionCb,
-                                ctx.isDrEnabled(),
-                                taskName,
-                                expiry,
-                                sndPrevVal);
+                        IgniteTxManager tm = ctx.tm();
 
-                            deleted = updRes.deleted();
-                            dhtFut = updRes.dhtFuture();
+                        // Needed for metadata cache transaction.
+                        boolean set = 
tm.setTxTopologyHint(req.topologyVersion());
 
-                            if (req.operation() == TRANSFORM)
-                                retVal = updRes.invokeResults();
+                        try {
+                            if (keys.size() > 1 &&                             
// Several keys ...
+                                writeThrough() && !req.skipStore() &&          
// and store is enabled ...
+                                !ctx.store().isLocal() &&                      
// and this is not local store ...
+                                !ctx.dr().receiveEnabled()                     
// and no DR.
+                                ) {
+                                // This method can only be used when there are 
no replicated entries in the batch.
+                                UpdateBatchResult updRes = 
updateWithBatch(node,
+                                    hasNear,
+                                    req,
+                                    res,
+                                    locked,
+                                    ver,
+                                    dhtFut,
+                                    completionCb,
+                                    ctx.isDrEnabled(),
+                                    taskName,
+                                    expiry,
+                                    sndPrevVal);
+
+                                deleted = updRes.deleted();
+                                dhtFut = updRes.dhtFuture();
+
+                                if (req.operation() == TRANSFORM)
+                                    retVal = updRes.invokeResults();
+                            }
+                            else {
+                                UpdateSingleResult updRes = updateSingle(node,
+                                    hasNear,
+                                    req,
+                                    res,
+                                    locked,
+                                    ver,
+                                    dhtFut,
+                                    completionCb,
+                                    ctx.isDrEnabled(),
+                                    taskName,
+                                    expiry,
+                                    sndPrevVal);
+
+                                retVal = updRes.returnValue();
+                                deleted = updRes.deleted();
+                                dhtFut = updRes.dhtFuture();
+                            }
                         }
-                        else {
-                            UpdateSingleResult updRes = updateSingle(node,
-                                hasNear,
-                                req,
-                                res,
-                                locked,
-                                ver,
-                                dhtFut,
-                                completionCb,
-                                ctx.isDrEnabled(),
-                                taskName,
-                                expiry,
-                                sndPrevVal);
-
-                            retVal = updRes.returnValue();
-                            deleted = updRes.deleted();
-                            dhtFut = updRes.dhtFuture();
+                        finally {
+                            if (set)
+                                tm.setTxTopologyHint(null);
                         }
 
                         if (retVal == null)
@@ -2782,8 +2794,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
         if (log.isDebugEnabled())
             log.debug("Processing dht atomic update response [nodeId=" + 
nodeId + ", res=" + res + ']');
 
-        GridDhtAtomicUpdateFuture updateFut = 
(GridDhtAtomicUpdateFuture)ctx.mvcc().
-            atomicFuture(res.futureVersion());
+        GridDhtAtomicUpdateFuture updateFut = 
(GridDhtAtomicUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion());
 
         if (updateFut != null)
             updateFut.onResult(nodeId, res);

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index eefdc73..3c86083 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -20,7 +20,6 @@ package 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.UUID;
@@ -47,8 +46,8 @@ import 
org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
-import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
@@ -288,7 +287,7 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
             // Cannot remap.
             remapCnt = 1;
 
-            state.map(topVer);
+            state.map(topVer, null);
         }
     }
 
@@ -415,7 +414,7 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
             cache.topology().readUnlock();
         }
 
-        state.map(topVer);
+        state.map(topVer, null);
     }
 
     /**
@@ -582,7 +581,9 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
                     req = mappings != null ? mappings.get(nodeId) : null;
 
                 if (req != null && req.response() == null) {
-                    res = new GridNearAtomicUpdateResponse(cctx.cacheId(), 
nodeId, req.futureVersion(),
+                    res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+                        nodeId,
+                        req.futureVersion(),
                         cctx.deploymentEnabled());
 
                     ClusterTopologyCheckedException e = new 
ClusterTopologyCheckedException("Primary node left grid " +
@@ -603,6 +604,7 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
          * @param res Response.
          * @param nodeErr {@code True} if response was created on node failure.
          */
+        @SuppressWarnings("unchecked")
         void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean 
nodeErr) {
             GridNearAtomicUpdateRequest req;
 
@@ -774,7 +776,11 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
                     return;
                 }
 
-                IgniteInternalFuture<AffinityTopologyVersion> fut = 
cctx.affinity().affinityReadyFuture(remapTopVer);
+                IgniteInternalFuture<AffinityTopologyVersion> fut =
+                    cctx.shared().exchange().affinityReadyFuture(remapTopVer);
+
+                if (fut == null)
+                    fut = new GridFinishedFuture<>(remapTopVer);
 
                 fut.listen(new 
CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
                     @Override public void apply(final 
IgniteInternalFuture<AffinityTopologyVersion> fut) {
@@ -783,7 +789,7 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
                                 try {
                                     AffinityTopologyVersion topVer = fut.get();
 
-                                    map(topVer);
+                                    map(topVer, remapKeys);
                                 }
                                 catch (IgniteCheckedException e) {
                                     onDone(e);
@@ -819,8 +825,9 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
 
         /**
          * @param topVer Topology version.
+         * @param remapKeys Keys to remap.
          */
-        void map(AffinityTopologyVersion topVer) {
+        void map(AffinityTopologyVersion topVer, @Nullable 
Collection<KeyCacheObject> remapKeys) {
             Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
 
             if (F.isEmpty(topNodes)) {
@@ -832,68 +839,78 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
 
             Exception err = null;
             GridNearAtomicUpdateRequest singleReq0 = null;
-            Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = null;
+            Map<UUID, GridNearAtomicUpdateRequest> mappings0 = null;
 
             int size = keys.size();
 
-            synchronized (this) {
-                assert futVer == null : this;
-                assert this.topVer == AffinityTopologyVersion.ZERO : this;
+            GridCacheVersion futVer = cctx.versions().next(topVer);
 
-                resCnt = 0;
+            GridCacheVersion updVer;
 
-                this.topVer = topVer;
+            // Assign version on near node in CLOCK ordering mode even if 
fastMap is false.
+            if (cctx.config().getAtomicWriteOrderMode() == CLOCK) {
+                updVer = this.updVer;
 
-                futVer = cctx.versions().next(topVer);
+                if (updVer == null) {
+                    updVer = cctx.versions().next(topVer);
 
-                if (storeFuture()) {
-                    if (!cctx.mvcc().addAtomicFuture(futVer, 
GridNearAtomicUpdateFuture.this)) {
-                        assert isDone() : GridNearAtomicUpdateFuture.this;
-
-                        return;
-                    }
+                    if (log.isDebugEnabled())
+                        log.debug("Assigned fast-map version for update on 
near node: " + updVer);
                 }
+            }
+            else
+                updVer = null;
 
-                // Assign version on near node in CLOCK ordering mode even if 
fastMap is false.
-                if (updVer == null)
-                    updVer = cctx.config().getAtomicWriteOrderMode() == CLOCK 
? cctx.versions().next(topVer) : null;
+            try {
+                if (size == 1 && !fastMap) {
+                    assert remapKeys == null || remapKeys.size() == 1;
 
-                if (updVer != null && log.isDebugEnabled())
-                    log.debug("Assigned fast-map version for update on near 
node: " + updVer);
+                    singleReq0 = mapSingleUpdate(topVer, futVer, updVer);
+                }
+                else {
+                    Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = 
mapUpdate(topNodes,
+                        topVer,
+                        futVer,
+                        updVer,
+                        remapKeys);
+
+                    if (pendingMappings.size() == 1)
+                        singleReq0 = F.firstValue(pendingMappings);
+                    else {
+                        if (syncMode == PRIMARY_SYNC) {
+                            mappings0 = U.newHashMap(pendingMappings.size());
 
-                try {
-                    if (size == 1 && !fastMap) {
-                        assert remapKeys == null || remapKeys.size() == 1;
+                            for (GridNearAtomicUpdateRequest req : 
pendingMappings.values()) {
+                                if (req.hasPrimary())
+                                    mappings0.put(req.nodeId(), req);
+                            }
+                        }
+                        else
+                            mappings0 = pendingMappings;
 
-                        singleReq0 = singleReq = mapSingleUpdate();
+                        assert !mappings0.isEmpty() || size == 0 : 
GridNearAtomicUpdateFuture.this;
                     }
-                    else {
-                        pendingMappings = mapUpdate(topNodes);
+                }
 
-                        if (pendingMappings.size() == 1)
-                            singleReq0 = singleReq = 
F.firstValue(pendingMappings);
-                        else {
-                            if (syncMode == PRIMARY_SYNC) {
-                                mappings = 
U.newHashMap(pendingMappings.size());
+                synchronized (this) {
+                    assert this.futVer == null : this;
+                    assert this.topVer == AffinityTopologyVersion.ZERO : this;
 
-                                for (GridNearAtomicUpdateRequest req : 
pendingMappings.values()) {
-                                    if (req.hasPrimary())
-                                        mappings.put(req.nodeId(), req);
-                                }
-                            }
-                            else
-                                mappings = new HashMap<>(pendingMappings);
+                    this.topVer = topVer;
+                    this.updVer = updVer;
+                    this.futVer = futVer;
 
-                            assert !mappings.isEmpty() || size == 0 : 
GridNearAtomicUpdateFuture.this;
-                        }
-                    }
+                    resCnt = 0;
 
-                    remapKeys = null;
-                }
-                catch (Exception e) {
-                    err = e;
+                    singleReq = singleReq0;
+                    mappings = mappings0;
+
+                    this.remapKeys = null;
                 }
             }
+            catch (Exception e) {
+                err = e;
+            }
 
             if (err != null) {
                 onDone(err);
@@ -901,16 +918,24 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
                 return;
             }
 
+            if (storeFuture()) {
+                if (!cctx.mvcc().addAtomicFuture(futVer, 
GridNearAtomicUpdateFuture.this)) {
+                    assert isDone() : GridNearAtomicUpdateFuture.this;
+
+                    return;
+                }
+            }
+
             // Optimize mapping for single key.
             if (singleReq0 != null)
                 mapSingle(singleReq0.nodeId(), singleReq0);
             else {
-                assert pendingMappings != null;
+                assert mappings0 != null;
 
                 if (size == 0)
                     onDone(new GridCacheReturn(cctx, true, true, null, true));
                 else
-                    doUpdate(pendingMappings);
+                    doUpdate(mappings0);
             }
         }
 
@@ -958,10 +983,18 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
 
         /**
          * @param topNodes Cache nodes.
+         * @param topVer Topology version.
+         * @param futVer Future version.
+         * @param updVer Update version.
+         * @param remapKeys Keys to remap.
          * @return Mapping.
          * @throws Exception If failed.
          */
-        private Map<UUID, GridNearAtomicUpdateRequest> 
mapUpdate(Collection<ClusterNode> topNodes) throws Exception {
+        private Map<UUID, GridNearAtomicUpdateRequest> 
mapUpdate(Collection<ClusterNode> topNodes,
+            AffinityTopologyVersion topVer,
+            GridCacheVersion futVer,
+            @Nullable GridCacheVersion updVer,
+            @Nullable Collection<KeyCacheObject> remapKeys) throws Exception {
             Iterator<?> it = null;
 
             if (vals != null)
@@ -999,7 +1032,7 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
                         throw new NullPointerException("Null value.");
                 }
                 else if (conflictPutVals != null) {
-                    GridCacheDrInfo conflictPutVal =  conflictPutValsIt.next();
+                    GridCacheDrInfo conflictPutVal = conflictPutValsIt.next();
 
                     val = conflictPutVal.value();
                     conflictVer = conflictPutVal.version();
@@ -1082,10 +1115,15 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
         }
 
         /**
+         * @param topVer Topology version.
+         * @param futVer Future version.
+         * @param updVer Update version.
          * @return Request.
          * @throws Exception If failed.
          */
-        private GridNearAtomicUpdateRequest mapSingleUpdate() throws Exception 
{
+        private GridNearAtomicUpdateRequest 
mapSingleUpdate(AffinityTopologyVersion topVer,
+            GridCacheVersion futVer,
+            @Nullable GridCacheVersion updVer) throws Exception {
             Object key = F.first(keys);
 
             Object val;

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 22b329c..a5f5286 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -48,7 +48,6 @@ import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearLock
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
-import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -596,12 +595,8 @@ public final class GridDhtColocatedLockFuture extends 
GridCompoundIdentityFuture
         AffinityTopologyVersion topVer = 
cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
 
         // If there is another system transaction in progress, use it's 
topology version to prevent deadlock.
-        if (topVer == null && tx != null && tx.system()) {
-            IgniteInternalTx tx0 = 
cctx.tm().anyActiveThreadTx(Thread.currentThread().getId(), tx);
-
-            if (tx0 != null)
-                topVer = tx0.topologyVersionSnapshot();
-        }
+        if (topVer == null && tx != null && tx.system())
+            topVer = 
cctx.tm().lockedTopologyVersion(Thread.currentThread().getId(), tx);
 
         if (topVer != null && tx != null)
             tx.topologyVersion(topVer);
@@ -980,7 +975,7 @@ public final class GridDhtColocatedLockFuture extends 
GridCompoundIdentityFuture
      * @throws IgniteCheckedException If failed.
      */
     private void proceedMapping() throws IgniteCheckedException {
-        boolean set = tx != null && cctx.shared().tm().setTxTopologyHint(tx);
+        boolean set = tx != null && 
cctx.shared().tm().setTxTopologyHint(tx.topologyVersionSnapshot());
 
         try {
             proceedMapping0();

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 23e0f6b..55c5ab6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -46,7 +46,6 @@ import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedCa
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactionalCacheAdapter;
-import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -723,12 +722,8 @@ public final class GridNearLockFuture extends 
GridCompoundIdentityFuture<Boolean
         AffinityTopologyVersion topVer = 
cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
 
         // If there is another system transaction in progress, use it's 
topology version to prevent deadlock.
-        if (topVer == null && tx != null && tx.system()) {
-            IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(threadId, tx);
-
-            if (tx0 != null)
-                topVer = tx0.topologyVersionSnapshot();
-        }
+        if (topVer == null && tx != null && tx.system())
+            topVer = cctx.tm().lockedTopologyVersion(threadId, tx);
 
         if (topVer != null && tx != null)
             tx.topologyVersion(topVer);
@@ -1098,7 +1093,7 @@ public final class GridNearLockFuture extends 
GridCompoundIdentityFuture<Boolean
      * @throws IgniteCheckedException If failed.
      */
     private void proceedMapping() throws IgniteCheckedException {
-        boolean set = tx != null && cctx.shared().tm().setTxTopologyHint(tx);
+        boolean set = tx != null && 
cctx.shared().tm().setTxTopologyHint(tx.topologyVersionSnapshot());
 
         try {
             proceedMapping0();

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index f52b3fc..37dc564 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -84,7 +84,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture 
extends GridNearOptim
      * @param cctx Context.
      * @param tx Transaction.
      */
-    public 
GridNearOptimisticSerializableTxPrepareFuture(GridCacheSharedContext cctx, 
GridNearTxLocal tx) {
+    public 
GridNearOptimisticSerializableTxPrepareFuture(GridCacheSharedContext cctx,
+        GridNearTxLocal tx) {
         super(cctx, tx);
 
         assert tx.optimistic() && tx.serializable() : tx;
@@ -304,7 +305,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture 
extends GridNearOptim
             return;
         }
 
-        boolean set = cctx.tm().setTxTopologyHint(tx);
+        boolean set = 
cctx.tm().setTxTopologyHint(tx.topologyVersionSnapshot());
 
         try {
             prepare(tx.readEntries(), tx.writeEntries(), remap, topLocked);

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 2ce14af..a9f158a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -72,7 +72,8 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearOptimisticTxPrepa
      * @param cctx Context.
      * @param tx Transaction.
      */
-    public GridNearOptimisticTxPrepareFuture(GridCacheSharedContext cctx, 
GridNearTxLocal tx) {
+    public GridNearOptimisticTxPrepareFuture(GridCacheSharedContext cctx,
+        GridNearTxLocal tx) {
         super(cctx, tx);
 
         assert tx.optimistic() && !tx.serializable() : tx;
@@ -405,7 +406,7 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearOptimisticTxPrepa
         if (isDone())
             return;
 
-        boolean set = cctx.tm().setTxTopologyHint(tx);
+        boolean set = 
cctx.tm().setTxTopologyHint(tx.topologyVersionSnapshot());
 
         try {
             assert !m.empty();

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
index b3eab34..fa7020b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
@@ -23,7 +23,6 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
-import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -41,7 +40,8 @@ public abstract class 
GridNearOptimisticTxPrepareFutureAdapter extends GridNearT
      * @param cctx Context.
      * @param tx Transaction.
      */
-    public GridNearOptimisticTxPrepareFutureAdapter(GridCacheSharedContext 
cctx, GridNearTxLocal tx) {
+    public GridNearOptimisticTxPrepareFutureAdapter(GridCacheSharedContext 
cctx,
+        GridNearTxLocal tx) {
         super(cctx, tx);
 
         assert tx.optimistic() : tx;
@@ -55,12 +55,8 @@ public abstract class 
GridNearOptimisticTxPrepareFutureAdapter extends GridNearT
         AffinityTopologyVersion topVer = 
cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
 
         // If there is another system transaction in progress, use it's 
topology version to prevent deadlock.
-        if (topVer == null && tx != null && tx.system()) {
-            IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(threadId, tx);
-
-            if (tx0 != null)
-                topVer = tx0.topologyVersionSnapshot();
-        }
+        if (topVer == null && tx != null && tx.system())
+            topVer = cctx.tm().lockedTopologyVersion(threadId, tx);
 
         if (topVer != null) {
             tx.topologyVersion(topVer);

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 720832e..70c79a5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -3203,8 +3203,10 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
     }
 
     /**
+     * @param cacheCtx Cache context.
      * @param loadFut Missing keys load future.
      * @param ret Future result.
+     * @param keepBinary Keep binary flag.
      * @return Future.
      */
     private IgniteInternalFuture optimisticPutFuture(

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index d2b803a..d384e4e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -114,8 +114,8 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
     /** Committing transactions. */
     private final ThreadLocal<IgniteInternalTx> threadCtx = new 
ThreadLocal<>();
 
-    /** Transaction which topology version should be used when mapping 
internal tx. */
-    private final ThreadLocal<IgniteInternalTx> txTopology = new 
ThreadLocal<>();
+    /** Topology version should be used when mapping internal tx. */
+    private final ThreadLocal<AffinityTopologyVersion> txTop = new 
ThreadLocal<>();
 
     /** Per-thread transaction map. */
     private final ConcurrentMap<Long, IgniteInternalTx> threadMap = newMap();
@@ -130,7 +130,7 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
     private final ConcurrentMap<GridCacheVersion, IgniteInternalTx> nearIdMap 
= newMap();
 
     /** TX handler. */
-    private IgniteTxHandler txHandler;
+    private IgniteTxHandler txHnd;
 
     /** Committed local transactions. */
     private final GridBoundedConcurrentOrderedMap<GridCacheVersion, Boolean> 
completedVersSorted =
@@ -197,7 +197,7 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
     @Override protected void start0() throws IgniteCheckedException {
         txFinishSync = new GridCacheTxFinishSync<>(cctx);
 
-        txHandler = new IgniteTxHandler(cctx);
+        txHnd = new IgniteTxHandler(cctx);
     }
 
     /** {@inheritDoc} */
@@ -212,7 +212,7 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
      * @return TX handler.
      */
     public IgniteTxHandler txHandler() {
-        return txHandler;
+        return txHnd;
     }
 
     /**
@@ -607,13 +607,17 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
     /**
      * @param threadId Thread ID.
      * @param ignore Transaction to ignore.
-     * @return Any transaction associated with the current thread.
+     * @return Not null topology version if current thread holds lock 
preventing topology change.
      */
-    public IgniteInternalTx anyActiveThreadTx(long threadId, IgniteInternalTx 
ignore) {
+    @Nullable public AffinityTopologyVersion lockedTopologyVersion(long 
threadId, IgniteInternalTx ignore) {
         IgniteInternalTx tx = threadMap.get(threadId);
 
-        if (tx != null && tx.topologyVersionSnapshot() != null)
-            return tx;
+        if (tx != null) {
+            AffinityTopologyVersion topVer = tx.topologyVersionSnapshot();
+
+            if (topVer != null)
+                return topVer;
+        }
 
         for (GridCacheContext cacheCtx : 
cctx.cache().context().cacheContexts()) {
             if (!cacheCtx.systemTx())
@@ -621,22 +625,27 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
 
             tx = sysThreadMap.get(new TxThreadKey(threadId, 
cacheCtx.cacheId()));
 
-            if (tx != null && tx != ignore && tx.topologyVersionSnapshot() != 
null)
-                return tx;
+            if (tx != null && tx != ignore) {
+                AffinityTopologyVersion topVer = tx.topologyVersionSnapshot();
+
+                if (topVer != null)
+                    return topVer;
+            }
         }
 
-        return txTopology.get();
+        return txTop.get();
     }
 
     /**
-     * @param tx Transaction.
+     * @param topVer Locked topology version.
+     * @return {@code True} if topology hint was set.
      */
-    public boolean setTxTopologyHint(IgniteInternalTx tx) {
-        if (tx == null)
-            txTopology.remove();
+    public boolean setTxTopologyHint(@Nullable AffinityTopologyVersion topVer) 
{
+        if (topVer == null)
+            txTop.remove();
         else {
-            if (txTopology.get() == null) {
-                txTopology.set(tx);
+            if (txTop.get() == null) {
+                txTop.set(topVer);
 
                 return true;
             }
@@ -1807,8 +1816,10 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
             this.evtNodeId = evtNodeId;
         }
 
-        /** {@inheritDoc} */
-        @Override public void onTimeout() {
+        /**
+         *
+         */
+        private void onTimeout0() {
             try {
                 cctx.kernalContext().gateway().readLock();
             }
@@ -1861,6 +1872,16 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
                 cctx.kernalContext().gateway().readUnlock();
             }
         }
+
+        /** {@inheritDoc} */
+        @Override public void onTimeout() {
+            // Should not block timeout thread.
+            cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                @Override public void run() {
+                    onTimeout0();
+                }
+            });
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index a2aab77..da39209 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -62,6 +62,9 @@ public class DataStreamProcessor<K, V> extends 
GridProcessorAdapter {
     /** Marshaller. */
     private final Marshaller marsh;
 
+    /** */
+    private byte[] marshErrBytes;
+
     /**
      * @param ctx Kernal context.
      */
@@ -86,6 +89,9 @@ public class DataStreamProcessor<K, V> extends 
GridProcessorAdapter {
         if (ctx.config().isDaemon())
             return;
 
+        marshErrBytes = marsh.marshal(new IgniteCheckedException("Failed to 
marshal response error, " +
+            "see node log for details."));
+
         flusher = new IgniteThread(new GridWorker(ctx.gridName(), 
"grid-data-loader-flusher", log) {
             @Override protected void body() throws InterruptedException {
                 while (!isCancelled()) {
@@ -324,10 +330,10 @@ public class DataStreamProcessor<K, V> extends 
GridProcessorAdapter {
         try {
             errBytes = err != null ? marsh.marshal(err) : null;
         }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to marshal message.", e);
+        catch (Exception e) {
+            U.error(log, "Failed to marshal error [err=" + err + ", marshErr=" 
+ e + ']', e);
 
-            return;
+            errBytes = marshErrBytes;
         }
 
         DataStreamerResponse res = new DataStreamerResponse(reqId, errBytes, 
forceLocDep);

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index 8d5a8e7..8eeca6b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -3408,6 +3408,7 @@ public class GridFunc {
      * @return First element in given collection for which predicate evaluates 
to
      *      {@code true} - or {@code null} if such element cannot be found.
      */
+    @SafeVarargs
     @Nullable public static <V> V find(Iterable<? extends V> c, @Nullable V 
dfltVal,
         @Nullable IgnitePredicate<? super V>... p) {
         A.notNull(c, "c");

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/config/websession/example-cache.xml
----------------------------------------------------------------------
diff --git a/modules/core/src/test/config/websession/example-cache.xml 
b/modules/core/src/test/config/websession/example-cache.xml
index d5bfeb7..0cc0e1e 100644
--- a/modules/core/src/test/config/websession/example-cache.xml
+++ b/modules/core/src/test/config/websession/example-cache.xml
@@ -130,14 +130,7 @@
         <property name="discoverySpi">
             <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                 <property name="ipFinder">
-                    <!--
-                        Ignite provides several options for automatic 
discovery that can be used
-                        instead os static IP based discovery. For information 
on all options refer
-                        to our documentation: 
http://apacheignite.readme.io/docs/cluster-config
-                    -->
-                    <!-- Uncomment static IP finder to enable static-based 
discovery of initial nodes. -->
-                    <!--<bean 
class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
-                    <bean 
class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
+                    <bean 
class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
                         <property name="addresses">
                             <list>
                                 <!-- In distributed environment, replace with 
actual host IP address. -->

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
index f050c72..7e217b7 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
@@ -117,7 +117,8 @@ public abstract class 
IgniteClientReconnectFailoverAbstractTest extends IgniteCl
                     }
 
                     return null;
-                } catch (Throwable e) {
+                }
+                catch (Throwable e) {
                     log.error("Unexpected error in operation thread: " + e, e);
 
                     stop.set(true);

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 5b294cc..0d9c541 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -3277,9 +3277,9 @@ public abstract class GridCacheAbstractFullApiSelfTest 
extends GridCacheAbstract
      * @throws Exception If failed.
      */
     public void testPeekExpired() throws Exception {
-        IgniteCache<String, Integer> c = jcache();
+        final IgniteCache<String, Integer> c = jcache();
 
-        String key = primaryKeysForCache(c, 1).get(0);
+        final String key = primaryKeysForCache(c, 1).get(0);
 
         info("Using key: " + key);
 
@@ -3295,6 +3295,12 @@ public abstract class GridCacheAbstractFullApiSelfTest 
extends GridCacheAbstract
 
         Thread.sleep(ttl + 100);
 
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return peek(c, key) == null;
+            }
+        }, 2000);
+
         assert peek(c, key) == null;
 
         assert c.localSize() == 0 : "Cache is not empty.";
@@ -3307,9 +3313,9 @@ public abstract class GridCacheAbstractFullApiSelfTest 
extends GridCacheAbstract
      */
     public void testPeekExpiredTx() throws Exception {
         if (txShouldBeUsed()) {
-            IgniteCache<String, Integer> c = jcache();
+            final IgniteCache<String, Integer> c = jcache();
 
-            String key = "1";
+            final String key = "1";
             int ttl = 500;
 
             try (Transaction tx = grid(0).transactions().txStart()) {
@@ -3320,9 +3326,13 @@ public abstract class GridCacheAbstractFullApiSelfTest 
extends GridCacheAbstract
                 tx.commit();
             }
 
-            Thread.sleep(ttl + 100);
+            GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    return peek(c, key) == null;
+                }
+            }, 2000);
 
-            assertNull(c.localPeek(key, ONHEAP));
+            assertNull(peek(c, key));
 
             assert c.localSize() == 0;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
index 52fbf4c..b3d1384 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
@@ -416,9 +416,8 @@ public abstract class GridCacheAbstractSelfTest extends 
GridCommonAbstractTest {
      * @param cache Cache projection.
      * @param key Key.
      * @return Value.
-     * @throws Exception If failed.
      */
-    @Nullable protected <K, V> V peek(IgniteCache<K, V> cache, K key) throws 
Exception {
+    @Nullable protected <K, V> V peek(IgniteCache<K, V> cache, K key) {
         return offheapTiered(cache) ? cache.localPeek(key, CachePeekMode.SWAP, 
CachePeekMode.OFFHEAP) :
             cache.localPeek(key, CachePeekMode.ONHEAP);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java
index e53650c..c95c586 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java
@@ -152,7 +152,7 @@ public class 
IgniteBinaryMetadataUpdateChangingTopologySelfTest extends GridComm
         private List<Object> recordedMsgs = new ArrayList<>();
 
         /** {@inheritDoc} */
-        @Override public void sendMessage(ClusterNode node, Message msg, 
IgniteInClosure<IgniteException> ackClosure)
+        @Override public void sendMessage(ClusterNode node, Message msg, 
IgniteInClosure<IgniteException> ackC)
             throws IgniteSpiException {
             if (msg instanceof GridIoMessage) {
                 Object msg0 = ((GridIoMessage)msg).message();
@@ -174,7 +174,7 @@ public class 
IgniteBinaryMetadataUpdateChangingTopologySelfTest extends GridComm
                 }
             }
 
-            super.sendMessage(node, msg, ackClosure);
+            super.sendMessage(node, msg, ackC);
         }
 
         /**
@@ -238,6 +238,9 @@ public class 
IgniteBinaryMetadataUpdateChangingTopologySelfTest extends GridComm
         }
     }
 
+    /**
+     *
+     */
     private static class TestValue {
         /** Field1. */
         private String field1;

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java
new file mode 100644
index 0000000..814fb08
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.CacheException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteBinaryMetadataUpdateNodeRestartTest extends 
GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final String ATOMIC_CACHE = "atomicCache";
+
+    /** */
+    private static final String TX_CACHE = "txCache";
+
+    /** */
+    private static final int SRVS = 3;
+
+    /** */
+    private static final int CLIENTS = 1;
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        
((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
+        cfg.setMarshaller(null);
+
+        CacheConfiguration ccfg1 = cacheConfiguration(TX_CACHE, TRANSACTIONAL);
+        CacheConfiguration ccfg2 = cacheConfiguration(ATOMIC_CACHE, ATOMIC);
+        
+        cfg.setCacheConfiguration(ccfg1, ccfg2);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @param name Cache name.
+     * @param atomicityMode Cache atomicity mode.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration cacheConfiguration(String name, 
CacheAtomicityMode atomicityMode) {
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setCacheMode(PARTITIONED);
+        ccfg.setBackups(1);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setName(name);
+        ccfg.setAtomicityMode(atomicityMode);
+
+        return ccfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNodeRestart() throws Exception {
+        for (int i = 0; i < 10; i++) {
+            log.info("Iteration: " + i);
+
+            client = false;
+
+            startGridsMultiThreaded(SRVS);
+
+            client = true;
+
+            startGrid(SRVS);
+
+            final AtomicBoolean stop = new AtomicBoolean();
+
+            try {
+                IgniteInternalFuture<?> restartFut = 
GridTestUtils.runAsync(new Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        while (!stop.get()) {
+                            log.info("Start node.");
+
+                            startGrid(SRVS + CLIENTS);
+
+                            log.info("Stop node.");
+
+                            stopGrid(SRVS + CLIENTS);
+                        }
+
+                        return null;
+                    }
+                }, "restart-thread");
+
+                final AtomicInteger idx = new AtomicInteger();
+
+                IgniteInternalFuture<?> fut = 
GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        int threadIdx = idx.getAndIncrement();
+
+                        int node = threadIdx % (SRVS + CLIENTS);
+
+                        Ignite ignite = ignite(node);
+
+                        log.info("Started thread: " + ignite.name());
+
+                        Thread.currentThread().setName("update-thread-" + 
threadIdx + "-" + ignite.name());
+
+                        IgniteCache<Object, Object> cache1 = 
ignite.cache(ATOMIC_CACHE);
+                        IgniteCache<Object, Object> cache2 = 
ignite.cache(TX_CACHE);
+
+                        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                        while (!stop.get()) {
+                            try {
+                                cache1.put(new TestClass1(true), 
create(rnd.nextInt(20) + 1));
+
+                                cache1.invoke(new TestClass1(true), new 
TestEntryProcessor(rnd.nextInt(20) + 1));
+
+                                cache2.put(new TestClass1(true), 
create(rnd.nextInt(20) + 1));
+
+                                cache2.invoke(new TestClass1(true), new 
TestEntryProcessor(rnd.nextInt(20) + 1));
+                            }
+                            catch (CacheException | IgniteException e) {
+                                log.info("Error: " + e);
+
+                                if (X.hasCause(e, 
ClusterTopologyException.class)) {
+                                    ClusterTopologyException cause = 
X.cause(e, ClusterTopologyException.class);
+
+                                    if (cause.retryReadyFuture() != null)
+                                        cause.retryReadyFuture().get();
+                                }
+                            }
+                        }
+
+                        return null;
+                    }
+                }, 10, "update-thread");
+
+                U.sleep(5_000);
+
+                stop.set(true);
+
+                restartFut.get();
+
+                fut.get();
+            }
+            finally {
+                stop.set(true);
+
+                stopAllGrids();
+            }
+        }
+    }
+
+    /**
+     * @param id Class ID.
+     * @return Test class instance.
+     */
+    private static Object create(int id) {
+        switch (id) {
+            case 1: return new TestClass1(true);
+
+            case 2: return new TestClass2();
+
+            case 3: return new TestClass3();
+
+            case 4: return new TestClass4();
+
+            case 5: return new TestClass5();
+
+            case 6: return new TestClass6();
+
+            case 7: return new TestClass7();
+
+            case 8: return new TestClass8();
+
+            case 9: return new TestClass9();
+
+            case 10: return new TestClass10();
+
+            case 11: return new TestClass11();
+
+            case 12: return new TestClass12();
+
+            case 13: return new TestClass13();
+
+            case 14: return new TestClass14();
+
+            case 15: return new TestClass15();
+
+            case 16: return new TestClass16();
+
+            case 17: return new TestClass17();
+
+            case 18: return new TestClass18();
+
+            case 19: return new TestClass19();
+
+            case 20: return new TestClass20();
+        }
+
+        fail();
+
+        return null;
+    }
+
+    /**
+     *
+     */
+    static class TestEntryProcessor implements CacheEntryProcessor<Object, 
Object, Object> {
+        /** */
+        private int id;
+
+        /**
+         * @param id Value id.
+         */
+        public TestEntryProcessor(int id) {
+            this.id = id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object process(MutableEntry<Object, Object> entry, 
Object... args) {
+            entry.setValue(create(id));
+
+            return null;
+        }
+    }
+
+    /**
+     *
+     */
+    static class TestClass1 {
+        /** */
+        int val;
+
+        /**
+         * @param setVal Set value flag.
+         */
+        public TestClass1(boolean setVal) {
+            this.val = setVal ? ThreadLocalRandom.current().nextInt(10_000) : 
0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o) return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TestClass1 that = (TestClass1)o;
+
+            return val == that.val;
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return val;
+        }
+    }
+
+    /**
+     *
+     */
+    static class TestClass2 {}
+
+    /**
+     *
+     */
+    static class TestClass3 {}
+
+    /**
+     *
+     */
+    static class TestClass4 {}
+
+    /**
+     *
+     */
+    static class TestClass5 {}
+
+    /**
+     *
+     */
+    static class TestClass6 {}
+
+    /**
+     *
+     */
+    static class TestClass7 {}
+
+    /**
+     *
+     */
+    static class TestClass8 {}
+
+    /**
+     *
+     */
+    static class TestClass9 {}
+
+    /**
+     *
+     */
+    static class TestClass10 {}
+
+    /**
+     *
+     */
+    static class TestClass11 {}
+
+    /**
+     *
+     */
+    static class TestClass12 {}
+
+    /**
+     *
+     */
+    static class TestClass13 {}
+
+    /**
+     *
+     */
+    static class TestClass14 {}
+
+    /**
+     *
+     */
+    static class TestClass15 {}
+
+    /**
+     *
+     */
+    static class TestClass16 {}
+
+    /**
+     *
+     */
+    static class TestClass17 {}
+
+    /**
+     *
+     */
+    static class TestClass18 {}
+
+    /**
+     *
+     */
+    static class TestClass19 {}
+
+    /**
+     *
+     */
+    static class TestClass20 {}
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
index 242b12d..8d4af19 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
@@ -65,6 +65,8 @@ public class IgniteCacheManyClientsTest extends 
GridCommonAbstractTest {
     @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
+        cfg.setFailureDetectionTimeout(20_000);
+
         cfg.setConnectorConfiguration(null);
         cfg.setPeerClassLoadingEnabled(false);
         cfg.setTimeServerPortRange(200);

Reply via email to