ignite-3116 Cancel force keys futures on node stop

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/04280189
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/04280189
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/04280189

Branch: refs/heads/ignite-3163
Commit: 04280189433bde02144ee0f831ed775f8c0980b2
Parents: 9f2bafb
Author: sboikov <[email protected]>
Authored: Mon May 16 09:08:14 2016 +0300
Committer: sboikov <[email protected]>
Committed: Mon May 16 09:08:14 2016 +0300

----------------------------------------------------------------------
 .../cache/distributed/dht/GridDhtGetFuture.java | 126 +++++++++----------
 .../dht/GridDhtTransactionalCacheAdapter.java   |  57 ++++++++-
 .../dht/atomic/GridDhtAtomicCache.java          |  51 +++++++-
 .../dht/preloader/GridDhtForceKeysFuture.java   |   6 +-
 .../dht/preloader/GridDhtPreloader.java         |  28 ++++-
 .../communication/tcp/TcpCommunicationSpi.java  |  31 ++---
 .../IgniteCacheBinaryObjectsScanSelfTest.java   |   2 +
 .../distributed/IgniteCacheCreatePutTest.java   |  11 +-
 8 files changed, 225 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/04280189/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index e86c885..4d4c303 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -28,6 +28,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -43,6 +44,7 @@ import 
org.apache.ignite.internal.util.future.GridEmbeddedFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.lang.GridClosureException;
 import org.apache.ignite.internal.util.typedef.C2;
+import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -166,9 +168,67 @@ public final class GridDhtGetFuture<K, V> extends 
GridCompoundIdentityFuture<Col
      * Initializes future.
      */
     void init() {
-        map(keys);
+        GridDhtFuture<Object> fut = 
cctx.dht().dhtPreloader().request(keys.keySet(), topVer);
+
+        if (!F.isEmpty(fut.invalidPartitions())) {
+            if (retries == null)
+                retries = new HashSet<>();
+
+            retries.addAll(fut.invalidPartitions());
+        }
+
+        fut.listen(new CI1<IgniteInternalFuture<Object>>() {
+            @Override public void apply(IgniteInternalFuture<Object> fut) {
+                try {
+                    fut.get();
+                }
+                catch (NodeStoppingException e) {
+                    return;
+                }
+                catch (IgniteCheckedException e) {
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to request keys from preloader 
[keys=" + keys + ", err=" + e + ']');
+
+                    onDone(e);
+
+                    return;
+                }
+
+                Map<KeyCacheObject, Boolean> mappedKeys = null;
+
+                // Assign keys to primary nodes.
+                for (Map.Entry<KeyCacheObject, Boolean> key : keys.entrySet()) 
{
+                    int part = cctx.affinity().partition(key.getKey());
 
-        markInitialized();
+                    if (retries == null || !retries.contains(part)) {
+                        if (!map(key.getKey(), parts)) {
+                            if (retries == null)
+                                retries = new HashSet<>();
+
+                            retries.add(part);
+
+                            if (mappedKeys == null) {
+                                mappedKeys = U.newLinkedHashMap(keys.size());
+
+                                for (Map.Entry<KeyCacheObject, Boolean> key1 : 
keys.entrySet()) {
+                                    if (key1.getKey() == key.getKey())
+                                        break;
+
+                                    mappedKeys.put(key.getKey(), 
key1.getValue());
+                                }
+                            }
+                        }
+                        else if (mappedKeys != null)
+                            mappedKeys.put(key.getKey(), key.getValue());
+                    }
+                }
+
+                // Add new future.
+                add(getAsync(mappedKeys == null ? keys : mappedKeys));
+
+                markInitialized();
+            }
+        });
     }
 
     /** {@inheritDoc} */
@@ -204,68 +264,6 @@ public final class GridDhtGetFuture<K, V> extends 
GridCompoundIdentityFuture<Col
     }
 
     /**
-     * @param keys Keys.
-     */
-    private void map(final Map<KeyCacheObject, Boolean> keys) {
-        GridDhtFuture<Object> fut = 
cctx.dht().dhtPreloader().request(keys.keySet(), topVer);
-
-        if (!F.isEmpty(fut.invalidPartitions())) {
-            if (retries == null)
-                retries = new HashSet<>();
-
-            retries.addAll(fut.invalidPartitions());
-        }
-
-        add(new GridEmbeddedFuture<>(
-            new IgniteBiClosure<Object, Exception, 
Collection<GridCacheEntryInfo>>() {
-                @Override public Collection<GridCacheEntryInfo> apply(Object 
o, Exception e) {
-                    if (e != null) { // Check error first.
-                        if (log.isDebugEnabled())
-                            log.debug("Failed to request keys from preloader 
[keys=" + keys + ", err=" + e + ']');
-
-                        onDone(e);
-                    }
-
-                    Map<KeyCacheObject, Boolean> mappedKeys = null;
-
-                    // Assign keys to primary nodes.
-                    for (Map.Entry<KeyCacheObject, Boolean> key : 
keys.entrySet()) {
-                        int part = cctx.affinity().partition(key.getKey());
-
-                        if (retries == null || !retries.contains(part)) {
-                            if (!map(key.getKey(), parts)) {
-                                if (retries == null)
-                                    retries = new HashSet<>();
-
-                                retries.add(part);
-
-                                if (mappedKeys == null) {
-                                    mappedKeys = 
U.newLinkedHashMap(keys.size());
-
-                                    for (Map.Entry<KeyCacheObject, Boolean> 
key1 : keys.entrySet()) {
-                                        if (key1.getKey() == key.getKey())
-                                            break;
-
-                                        mappedKeys.put(key.getKey(), 
key1.getValue());
-                                    }
-                                }
-                            }
-                            else if (mappedKeys != null)
-                                mappedKeys.put(key.getKey(), key.getValue());
-                        }
-                    }
-
-                    // Add new future.
-                    add(getAsync(mappedKeys == null ? keys : mappedKeys));
-
-                    // Finish this one.
-                    return Collections.emptyList();
-                }
-            },
-            fut));
-    }
-
-    /**
      * @param key Key.
      * @param parts Parts to map.
      * @return {@code True} if mapped.

http://git-wip-us.apache.org/repos/asf/ignite/blob/04280189/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index d0b8092..b2da39c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -29,6 +29,7 @@ import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
@@ -377,11 +378,38 @@ public abstract class GridDhtTransactionalCacheAdapter<K, 
V> extends GridDhtCach
         IgniteInternalFuture<Object> keyFut = F.isEmpty(req.keys()) ? null :
             ctx.dht().dhtPreloader().request(req.keys(), 
req.topologyVersion());
 
-        if (keyFut == null || keyFut.isDone())
+        if (keyFut == null || keyFut.isDone()) {
+            if (keyFut != null) {
+                try {
+                    keyFut.get();
+                }
+                catch (NodeStoppingException e) {
+                    return;
+                }
+                catch (IgniteCheckedException e) {
+                    onForceKeysError(nodeId, req, e);
+
+                    return;
+                }
+            }
+
             processDhtLockRequest0(nodeId, req);
+        }
         else {
             keyFut.listen(new CI1<IgniteInternalFuture<Object>>() {
-                @Override public void apply(IgniteInternalFuture<Object> t) {
+                @Override public void apply(IgniteInternalFuture<Object> fut) {
+                    try {
+                        fut.get();
+                    }
+                    catch (NodeStoppingException e) {
+                        return;
+                    }
+                    catch (IgniteCheckedException e) {
+                        onForceKeysError(nodeId, req, e);
+
+                        return;
+                    }
+
                     processDhtLockRequest0(nodeId, req);
                 }
             });
@@ -391,6 +419,31 @@ public abstract class GridDhtTransactionalCacheAdapter<K, 
V> extends GridDhtCach
     /**
      * @param nodeId Node ID.
      * @param req Request.
+     * @param e Error.
+     */
+    private void onForceKeysError(UUID nodeId, GridDhtLockRequest req, 
IgniteCheckedException e) {
+        GridDhtLockResponse res = new GridDhtLockResponse(ctx.cacheId(),
+            req.version(),
+            req.futureId(),
+            req.miniId(),
+            e,
+            ctx.deploymentEnabled());
+
+        try {
+            ctx.io().send(nodeId, res, ctx.ioPolicy());
+        }
+        catch (ClusterTopologyCheckedException e0) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send lock reply to remote node because it 
left grid: " + nodeId);
+        }
+        catch (IgniteCheckedException e0) {
+            U.error(log, "Failed to send lock reply to node: " + nodeId, e);
+        }
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param req Request.
      */
     protected final void processDhtLockRequest0(UUID nodeId, 
GridDhtLockRequest req) {
         assert nodeId != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/04280189/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 6aad533..7361642 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -39,6 +39,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
@@ -1314,11 +1315,36 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
     ) {
         IgniteInternalFuture<Object> forceFut = preldr.request(req.keys(), 
req.topologyVersion());
 
-        if (forceFut.isDone())
+        if (forceFut.isDone()) {
+            try {
+                forceFut.get();
+            }
+            catch (NodeStoppingException e) {
+                return;
+            }
+            catch (IgniteCheckedException e) {
+                onForceKeysError(nodeId, req, completionCb, e);
+
+                return;
+            }
+
             updateAllAsyncInternal0(nodeId, req, completionCb);
+        }
         else {
             forceFut.listen(new CI1<IgniteInternalFuture<Object>>() {
-                @Override public void apply(IgniteInternalFuture<Object> t) {
+                @Override public void apply(IgniteInternalFuture<Object> fut) {
+                    try {
+                        fut.get();
+                    }
+                    catch (NodeStoppingException e) {
+                        return;
+                    }
+                    catch (IgniteCheckedException e) {
+                        onForceKeysError(nodeId, req, completionCb, e);
+
+                        return;
+                    }
+
                     updateAllAsyncInternal0(nodeId, req, completionCb);
                 }
             });
@@ -1326,6 +1352,27 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
     }
 
     /**
+     * @param nodeId Node ID.
+     * @param req Update request.
+     * @param completionCb Completion callback.
+     * @param e Error.
+     */
+    private void onForceKeysError(final UUID nodeId,
+        final GridNearAtomicUpdateRequest req,
+        final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> 
completionCb,
+        IgniteCheckedException e
+    ) {
+        GridNearAtomicUpdateResponse res = new 
GridNearAtomicUpdateResponse(ctx.cacheId(),
+            nodeId,
+            req.futureVersion(),
+            ctx.deploymentEnabled());
+
+        res.addFailedKeys(req.keys(), e);
+
+        completionCb.apply(req, res);
+    }
+
+    /**
      * Executes local update after preloader fetched values.
      *
      * @param nodeId Node ID.

http://git-wip-us.apache.org/repos/asf/ignite/blob/04280189/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
index 7970a44..4da1f38 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
@@ -246,7 +246,11 @@ public final class GridDhtForceKeysFuture<K, V> extends 
GridCompoundFuture<Objec
 
             int curTopVer = topCntr.get();
 
-            preloader.addFuture(this);
+            if (!preloader.addFuture(this)) {
+                assert isDone() : this;
+
+                return false;
+            }
 
             trackable = true;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/04280189/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index f0054e4..1b51a4b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -32,6 +32,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -122,6 +123,9 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
     /** */
     private final AtomicInteger partsEvictOwning = new AtomicInteger();
 
+    /** */
+    private volatile boolean stopping;
+
     /** Discovery listener. */
     private final GridLocalEventListener discoLsnr = new 
GridLocalEventListener() {
         @Override public void onEvent(Event evt) {
@@ -244,6 +248,8 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
         if (log.isDebugEnabled())
             log.debug("DHT rebalancer onKernalStop callback.");
 
+        stopping = true;
+
         cctx.events().removeListener(discoLsnr);
 
         // Acquire write busy lock.
@@ -255,8 +261,19 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
         if (demander != null)
             demander.stop();
 
+        IgniteCheckedException err = stopError();
+
+        for (GridDhtForceKeysFuture fut : forceKeyFuts.values())
+            fut.onDone(err);
+
         top = null;
     }
+    /**
+     * @return Node stop exception.
+     */
+    private IgniteCheckedException stopError() {
+        return new NodeStoppingException("Operation has been cancelled (cache 
or node is stopping).");
+    }
 
     /** {@inheritDoc} */
     @Override public void onInitialExchangeComplete(@Nullable Throwable err) {
@@ -756,9 +773,18 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
      * Adds future to future map.
      *
      * @param fut Future to add.
+     * @return {@code False} if node cache is stopping and future was 
completed with error.
      */
-    void addFuture(GridDhtForceKeysFuture<?, ?> fut) {
+    boolean addFuture(GridDhtForceKeysFuture<?, ?> fut) {
         forceKeyFuts.put(fut.futureId(), fut);
+
+        if (stopping) {
+            fut.onDone(stopError());
+
+            return false;
+        }
+
+        return true;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/04280189/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 281b38e..0a18003 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -1381,26 +1381,29 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter
 
     /** {@inheritDoc} */
     @Override public void dumpStats() {
-        StringBuilder sb = new StringBuilder("Communication SPI recovery 
descriptors: ").append(U.nl());
+        IgniteLogger log = this.log;
 
-        for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> entry : 
recoveryDescs.entrySet()) {
-            GridNioRecoveryDescriptor desc = entry.getValue();
+        if (log != null && log.isInfoEnabled()) {
+            StringBuilder sb = new StringBuilder("Communication SPI recovery 
descriptors: ").append(U.nl());
 
-            sb.append("    [key=").append(entry.getKey())
-                .append(", msgsSent=").append(desc.sent())
-                .append(", msgsAckedByRmt=").append(desc.acked())
-                .append(", msgsRcvd=").append(desc.received())
-                .append(", descIdHash=").append(System.identityHashCode(desc))
-                .append(']').append(U.nl());
-        }
+            for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> entry : 
recoveryDescs.entrySet()) {
+                GridNioRecoveryDescriptor desc = entry.getValue();
+
+                sb.append("    [key=").append(entry.getKey())
+                    .append(", msgsSent=").append(desc.sent())
+                    .append(", msgsAckedByRmt=").append(desc.acked())
+                    .append(", msgsRcvd=").append(desc.received())
+                    .append(", 
descIdHash=").append(System.identityHashCode(desc))
+                    .append(']').append(U.nl());
+            }
 
-        if (log.isInfoEnabled())
             log.info(sb.toString());
+        }
 
-        GridNioServer<Message> nioSrvr1 = nioSrvr;
+        GridNioServer<Message> nioSrvr = this.nioSrvr;
 
-        if (nioSrvr1 != null)
-            nioSrvr1.dumpStats();
+        if (nioSrvr != null)
+            nioSrvr.dumpStats();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/04280189/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheBinaryObjectsScanSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheBinaryObjectsScanSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheBinaryObjectsScanSelfTest.java
index 07f3833..7743882 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheBinaryObjectsScanSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheBinaryObjectsScanSelfTest.java
@@ -60,6 +60,8 @@ public class IgniteCacheBinaryObjectsScanSelfTest extends 
GridCommonAbstractTest
     /** {@inheritDoc} */
     @Override protected void afterTestsStopped() throws Exception {
         ldr = null;
+
+        stopAllGrids();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/04280189/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java
index 8b3d9d3..90d5905 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java
@@ -25,12 +25,15 @@ import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
 /**
  *
  */
@@ -45,6 +48,8 @@ public class IgniteCacheCreatePutTest extends 
GridCommonAbstractTest {
     protected IgniteConfiguration getConfiguration(String gridName) throws 
Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
+        
((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
         cfg.setPeerClassLoadingEnabled(false);
 
         TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
@@ -62,6 +67,7 @@ public class IgniteCacheCreatePutTest extends 
GridCommonAbstractTest {
         ccfg.setName("cache*");
         ccfg.setCacheMode(CacheMode.PARTITIONED);
         ccfg.setBackups(1);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
 
         cfg.setCacheConfiguration(ccfg);
 
@@ -70,7 +76,7 @@ public class IgniteCacheCreatePutTest extends 
GridCommonAbstractTest {
 
     /** {@inheritDoc} */
     @Override protected long getTestTimeout() {
-        return 3 * 60 * 1000L;
+        return 5 * 60 * 1000L;
     }
 
     /** {@inheritDoc} */
@@ -96,8 +102,7 @@ public class IgniteCacheCreatePutTest extends 
GridCommonAbstractTest {
                     final AtomicInteger idx = new AtomicInteger();
 
                     GridTestUtils.runMultiThreaded(new Callable<Void>() {
-                        @Override
-                        public Void call() throws Exception {
+                        @Override public Void call() throws Exception {
                             int node = idx.getAndIncrement();
 
                             Ignite ignite = startGrid(node);

Reply via email to