5578

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8f6be3bc
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8f6be3bc
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8f6be3bc

Branch: refs/heads/ignite-5578
Commit: 8f6be3bc86a9d4fdd22040e705fe565ab2556278
Parents: d5631a5
Author: sboikov <[email protected]>
Authored: Tue Aug 1 10:53:09 2017 +0300
Committer: sboikov <[email protected]>
Committed: Tue Aug 1 12:08:45 2017 +0300

----------------------------------------------------------------------
 .../dht/GridDhtTransactionalCacheAdapter.java   |  2 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  2 +-
 .../cache/transactions/IgniteTxHandler.java     |  2 +-
 .../datastreamer/DataStreamProcessor.java       | 57 ++++++++++++--------
 .../datastreamer/DataStreamerImpl.java          | 56 +++++++++++--------
 .../org/apache/ignite/thread/IgniteThread.java  |  9 +++-
 ...CacheLoadingConcurrentGridStartSelfTest.java | 11 ++++
 ...ncurrentGridStartSelfTestAllowOverwrite.java | 33 ++++++++++++
 ...teCacheClientNodePartitionsExchangeTest.java | 48 +++++++++++------
 .../testsuites/IgniteCacheTestSuite2.java       |  2 +
 10 files changed, 157 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8f6be3bc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 79bd2f9..bf420cc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -687,7 +687,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, 
V> extends GridDhtCach
             if (curThread instanceof IgniteThread) {
                 final IgniteThread thread = (IgniteThread)curThread;
 
-                if (thread.hasStripeOrPolicy()) {
+                if (thread.cachePoolThread()) {
                     topFut.listen(new 
CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
                         @Override public void 
apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
                             
ctx.kernalContext().closure().runLocalWithThreadPolicy(thread, new Runnable() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/8f6be3bc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 0f9b609..ae214a0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1900,7 +1900,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             if (curThread instanceof IgniteThread) {
                 final IgniteThread thread = (IgniteThread)curThread;
 
-                if (thread.hasStripeOrPolicy()) {
+                if (thread.cachePoolThread()) {
                     topFut.listen(new 
CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
                         @Override public void 
apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
                             ctx.closures().runLocalWithThreadPolicy(thread, 
new Runnable() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/8f6be3bc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index fb9ed13..065e8a5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -573,7 +573,7 @@ public class IgniteTxHandler {
             if (curThread instanceof IgniteThread) {
                 final IgniteThread thread = (IgniteThread)curThread;
 
-                if (thread.hasStripeOrPolicy()) {
+                if (thread.cachePoolThread()) {
                     topFut.listen(new 
CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
                         @Override public void 
apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
                             
ctx.kernalContext().closure().runLocalWithThreadPolicy(thread, new Runnable() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/8f6be3bc/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index 84d536f..8b984c0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -321,8 +321,10 @@ public class DataStreamProcessor<K, V> extends 
GridProcessorAdapter {
         try {
             GridCacheAdapter cache = 
ctx.cache().internalCache(req.cacheName());
 
-            if (cache == null)
-                throw new IgniteCheckedException("Cache not created or already 
destroyed.");
+            if (cache == null) {
+                throw new IgniteCheckedException("Cache not created or already 
destroyed: " +
+                    req.cacheName());
+            }
 
             GridCacheContext cctx = cache.context();
 
@@ -336,18 +338,33 @@ public class DataStreamProcessor<K, V> extends 
GridProcessorAdapter {
             GridDhtTopologyFuture topWaitFut = null;
 
             try {
-                GridDhtTopologyFuture fut = cctx.topologyVersionFuture();
+                Exception remapErr = null;
+
+                AffinityTopologyVersion streamerFutTopVer = null;
+
+                if (!allowOverwrite) {
+                    GridDhtTopologyFuture topFut = 
cctx.topologyVersionFuture();
 
-                AffinityTopologyVersion topVer = fut.topologyVersion();
+                    AffinityTopologyVersion topVer = topFut.isDone() ? 
topFut.topologyVersion() :
+                        topFut.initialVersion();
+
+                    if (topVer.compareTo(req.topologyVersion()) > 0) {
+                        remapErr = new 
ClusterTopologyCheckedException("DataStreamer will retry " +
+                            "data transfer at stable topology [reqTop=" + 
req.topologyVersion() +
+                            ", topVer=" + topFut.initialVersion() + ", 
node=remote]");
+                    }
+                    else if (!topFut.isDone())
+                        topWaitFut = topFut;
+                    else
+                        streamerFutTopVer = topFut.topologyVersion();
+                }
 
-                if (!allowOverwrite && !topVer.equals(req.topologyVersion())) {
-                    Exception err = new ClusterTopologyCheckedException(
-                        "DataStreamer will retry data transfer at stable 
topology " +
-                            "[reqTop=" + req.topologyVersion() + ", topVer=" + 
topVer + ", node=remote]");
+                if (remapErr != null) {
+                    sendResponse(nodeId, topic, req.requestId(), remapErr, 
req.forceLocalDeployment());
 
-                    sendResponse(nodeId, topic, req.requestId(), err, 
req.forceLocalDeployment());
+                    return;
                 }
-                else if (allowOverwrite || fut.isDone()) {
+                else if (topWaitFut == null) {
                     job = new DataStreamerUpdateJob(ctx,
                         log,
                         req.cacheName(),
@@ -357,10 +374,8 @@ public class DataStreamProcessor<K, V> extends 
GridProcessorAdapter {
                         req.keepBinary(),
                         updater);
 
-                    waitFut = allowOverwrite ? null : 
cctx.mvcc().addDataStreamerFuture(topVer);
+                    waitFut = allowOverwrite ? null : 
cctx.mvcc().addDataStreamerFuture(streamerFutTopVer);
                 }
-                else
-                    topWaitFut = fut;
             }
             finally {
                 if (!allowOverwrite)
@@ -378,16 +393,14 @@ public class DataStreamProcessor<K, V> extends 
GridProcessorAdapter {
                 return;
             }
 
-            if (job != null) {
-                try {
-                    job.call();
+            try {
+                job.call();
 
-                    sendResponse(nodeId, topic, req.requestId(), null, 
req.forceLocalDeployment());
-                }
-                finally {
-                    if (waitFut != null)
-                        waitFut.onDone();
-                }
+                sendResponse(nodeId, topic, req.requestId(), null, 
req.forceLocalDeployment());
+            }
+            finally {
+                if (waitFut != null)
+                    waitFut.onDone();
             }
         }
         catch (Throwable e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/8f6be3bc/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 25b319f..afaa81d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -1524,23 +1524,36 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
             try {
                 GridCacheContext cctx = 
ctx.cache().internalCache(cacheName).context();
 
-                final boolean allowOverwrite = allowOverwrite();
-                final boolean loc = cctx.isLocal();
+                final boolean lockTop = !cctx.isLocal() && !allowOverwrite();
 
-                if (!loc && !allowOverwrite)
+                GridDhtTopologyFuture topWaitFut = null;
+
+                if (lockTop)
                     cctx.topology().readLock();
 
                 try {
-                    GridDhtTopologyFuture fut = loc ? null : 
cctx.topologyVersionFuture();
+                    AffinityTopologyVersion streamerFutTopVer = null;
+
+                    if (lockTop) {
+                        GridDhtTopologyFuture topFut = 
cctx.topologyVersionFuture();
+
+                        AffinityTopologyVersion topVer = topFut.isDone() ? 
topFut.topologyVersion() :
+                            topFut.initialVersion();
 
-                    AffinityTopologyVersion topVer = loc ? reqTopVer : 
fut.topologyVersion();
+                        if (topVer.compareTo(reqTopVer) > 0) {
+                            curFut.onDone(new 
IgniteCheckedException("DataStreamer will retry data transfer " +
+                                "at stable topology. reqTop=" + reqTopVer + ", 
topVer=" + topFut.initialVersion() +
+                                ", node=local]"));
 
-                    if (!allowOverwrite && !topVer.equals(reqTopVer)) {
-                        curFut.onDone(new IgniteCheckedException(
-                            "DataStreamer will retry data transfer at stable 
topology. " +
-                                "[reqTop=" + reqTopVer + ", topVer=" + topVer 
+ ", node=local]"));
+                            return;
+                        }
+                        else if (!topFut.isDone())
+                            topWaitFut = topFut;
+                        else
+                            streamerFutTopVer = topFut.topologyVersion();
                     }
-                    else if (loc || allowOverwrite || fut.isDone()) {
+
+                    if (topWaitFut == null) {
                         IgniteInternalFuture<Object> callFut = 
ctx.closure().callLocalSafe(
                             new DataStreamerUpdateJob(
                                 ctx,
@@ -1555,9 +1568,8 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
 
                         locFuts.add(callFut);
 
-                        final GridFutureAdapter waitFut = (loc || 
allowOverwrite) ?
-                            null :
-                            cctx.mvcc().addDataStreamerFuture(topVer);
+                        final GridFutureAdapter waitFut =
+                            lockTop ? 
cctx.mvcc().addDataStreamerFuture(streamerFutTopVer) : null;
 
                         callFut.listen(new 
IgniteInClosure<IgniteInternalFuture<Object>>() {
                             @Override public void 
apply(IgniteInternalFuture<Object> t) {
@@ -1578,18 +1590,20 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
                             }
                         });
                     }
-                    else {
-                        fut.listen(new 
IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                            @Override public void 
apply(IgniteInternalFuture<AffinityTopologyVersion> e) {
-                                localUpdate(entries, reqTopVer, curFut, plc);
-                            }
-                        });
-                    }
                 }
                 finally {
-                    if (!loc && !allowOverwrite)
+                    if (lockTop)
                         cctx.topology().readUnlock();
                 }
+
+                if (topWaitFut != null) {
+                    // Need call 'listen' after topology read lock is released.
+                    topWaitFut.listen(new 
IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                        @Override public void 
apply(IgniteInternalFuture<AffinityTopologyVersion> e) {
+                            localUpdate(entries, reqTopVer, curFut, plc);
+                        }
+                    });
+                }
             }
             catch (Throwable ex) {
                 curFut.onDone(new IgniteCheckedException("DataStreamer data 
handling failed.", ex));

http://git-wip-us.apache.org/repos/asf/ignite/blob/8f6be3bc/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java 
b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
index f0d39fb..83a0384 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
@@ -125,8 +125,13 @@ public class IgniteThread extends Thread {
         return stripe;
     }
 
-    public boolean hasStripeOrPolicy() {
-        return stripe >= 0 || plc != GridIoPolicy.UNDEFINED;
+    /**
+     * @return {@code True} if thread belongs to pool processing cache 
operations.
+     */
+    public boolean cachePoolThread() {
+        return stripe >= 0 ||
+            plc == GridIoPolicy.SYSTEM_POOL ||
+            plc == GridIoPolicy.UTILITY_CACHE_POOL;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/8f6be3bc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java
index 68e88ce..1e046d4 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java
@@ -29,6 +29,7 @@ import javax.cache.integration.CacheWriterException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.store.CacheStoreAdapter;
 import org.apache.ignite.cluster.ClusterNode;
@@ -48,6 +49,7 @@ import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
 import static org.apache.ignite.testframework.GridTestUtils.runAsync;
@@ -83,6 +85,8 @@ public class CacheLoadingConcurrentGridStartSelfTest extends 
GridCommonAbstractT
 
         CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
 
+        ccfg.setAtomicityMode(atomicityMode());
+
         ccfg.setCacheMode(PARTITIONED);
 
         ccfg.setBackups(1);
@@ -112,6 +116,13 @@ public class CacheLoadingConcurrentGridStartSelfTest 
extends GridCommonAbstractT
         return cfg;
     }
 
+    /**
+     * @return Cache atomicity mode.
+     */
+    protected CacheAtomicityMode atomicityMode() {
+        return ATOMIC;
+    }
+
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         stopAllGrids();

http://git-wip-us.apache.org/repos/asf/ignite/blob/8f6be3bc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheTxLoadingConcurrentGridStartSelfTestAllowOverwrite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheTxLoadingConcurrentGridStartSelfTestAllowOverwrite.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheTxLoadingConcurrentGridStartSelfTestAllowOverwrite.java
new file mode 100644
index 0000000..f70b195
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheTxLoadingConcurrentGridStartSelfTestAllowOverwrite.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+
+/**
+ *
+ */
+public class CacheTxLoadingConcurrentGridStartSelfTestAllowOverwrite extends
+    CacheLoadingConcurrentGridStartSelfTestAllowOverwrite {
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return TRANSACTIONAL;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8f6be3bc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java
index d03ae9e..8090c66 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java
@@ -58,6 +58,8 @@ import 
org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
+import static 
org.apache.ignite.internal.processors.cache.ExchangeContext.IGNITE_EXCHANGE_COMPATIBILITY_MODE;
+
 /**
  *
  */
@@ -197,31 +199,42 @@ public class IgniteCacheClientNodePartitionsExchangeTest 
extends GridCommonAbstr
      * @throws Exception If failed.
      */
     public void testPartitionsExchange() throws Exception {
-        partitionsExchange();
+        partitionsExchange(false);
     }
 
     /**
      * @throws Exception If failed.
      */
-    private void partitionsExchange() throws Exception {
+    public void testPartitionsExchangeCompatibilityMode() throws Exception {
+        System.setProperty(IGNITE_EXCHANGE_COMPATIBILITY_MODE, "true");
+
+        try {
+            partitionsExchange(true);
+        }
+        finally {
+            System.clearProperty(IGNITE_EXCHANGE_COMPATIBILITY_MODE);
+        }
+    }
+
+    /**
+     * @param compatibilityMode {@code True} to test old exchange protocol.
+     * @throws Exception If failed.
+     */
+    private void partitionsExchange(boolean compatibilityMode) throws 
Exception {
         Ignite ignite0 = startGrid(0);
 
         TestCommunicationSpi spi0 = 
(TestCommunicationSpi)ignite0.configuration().getCommunicationSpi();
 
         Ignite ignite1 = startGrid(1);
 
-        boolean lateAff = ignite1.configuration().isLateAffinityAssignment();
-
-        int minorVer = lateAff ? 1 : 0;
-
-        waitForTopologyUpdate(2, new AffinityTopologyVersion(2, minorVer));
+        waitForTopologyUpdate(2, new AffinityTopologyVersion(2, 1));
 
         TestCommunicationSpi spi1 = 
(TestCommunicationSpi)ignite1.configuration().getCommunicationSpi();
 
         assertEquals(0, spi0.partitionsSingleMessages());
-        assertEquals(lateAff ? 2 : 1, spi0.partitionsFullMessages());
+        assertEquals(2, spi0.partitionsFullMessages());
 
-        assertEquals(lateAff ? 2 : 1, spi1.partitionsSingleMessages());
+        assertEquals(2, spi1.partitionsSingleMessages());
         assertEquals(0, spi1.partitionsFullMessages());
 
         spi0.reset();
@@ -281,23 +294,23 @@ public class IgniteCacheClientNodePartitionsExchangeTest 
extends GridCommonAbstr
 
         Ignite ignite4 = startGrid(4);
 
-        waitForTopologyUpdate(5, new AffinityTopologyVersion(5, lateAff ? 1 : 
0));
+        waitForTopologyUpdate(5, new AffinityTopologyVersion(5, 1));
 
         TestCommunicationSpi spi4 = 
(TestCommunicationSpi)ignite4.configuration().getCommunicationSpi();
 
         assertEquals(0, spi0.partitionsSingleMessages());
-        assertEquals(lateAff ? 8 : 4, spi0.partitionsFullMessages());
+        assertEquals(8, spi0.partitionsFullMessages());
 
-        assertEquals(lateAff ? 2 : 1, spi1.partitionsSingleMessages());
+        assertEquals(2, spi1.partitionsSingleMessages());
         assertEquals(0, spi1.partitionsFullMessages());
 
-        assertEquals(lateAff ? 2 : 1, spi2.partitionsSingleMessages());
+        assertEquals(2, spi2.partitionsSingleMessages());
         assertEquals(0, spi2.partitionsFullMessages());
 
-        assertEquals(lateAff ? 2 : 1, spi3.partitionsSingleMessages());
+        assertEquals(2, spi3.partitionsSingleMessages());
         assertEquals(0, spi3.partitionsFullMessages());
 
-        assertEquals(lateAff ? 2 : 1, spi4.partitionsSingleMessages());
+        assertEquals(2, spi4.partitionsSingleMessages());
         assertEquals(0, spi4.partitionsFullMessages());
 
         spi0.reset();
@@ -307,9 +320,10 @@ public class IgniteCacheClientNodePartitionsExchangeTest 
extends GridCommonAbstr
 
         log.info("Stop server node.");
 
-        ignite4.close(); // With late affinity exchange on server leave is 
completed by discovery message.
+        ignite4.close();
 
-        if (lateAff) {
+        if (compatibilityMode) {
+            // With late affinity old protocol exchange on server leave is 
completed by discovery message.
             // With FairAffinityFunction affinity calculation is different, 
this causes one more topology change.
             boolean exchangeAfterRebalance = false;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8f6be3bc/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index 86d09fd..0e1aaec 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -55,6 +55,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.CacheLoadingConcu
 import 
org.apache.ignite.internal.processors.cache.distributed.CacheLoadingConcurrentGridStartSelfTestAllowOverwrite;
 import 
org.apache.ignite.internal.processors.cache.distributed.CacheLockReleaseNodeLeaveTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.CachePartitionStateTest;
+import 
org.apache.ignite.internal.processors.cache.distributed.CacheTxLoadingConcurrentGridStartSelfTestAllowOverwrite;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridCachePartitionNotLoadedEventSelfTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridCachePartitionedNearDisabledTxMultiThreadedSelfTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridCacheTransformEventSelfTest;
@@ -224,6 +225,7 @@ public class IgniteCacheTestSuite2 extends TestSuite {
         suite.addTest(new 
TestSuite(GridCachePartitionedPreloadLifecycleSelfTest.class));
         suite.addTest(new 
TestSuite(CacheLoadingConcurrentGridStartSelfTest.class));
         suite.addTest(new 
TestSuite(CacheLoadingConcurrentGridStartSelfTestAllowOverwrite.class));
+        suite.addTest(new 
TestSuite(CacheTxLoadingConcurrentGridStartSelfTestAllowOverwrite.class));
         suite.addTest(new TestSuite(GridCacheDhtPreloadDelayedSelfTest.class));
         suite.addTest(new TestSuite(GridPartitionedBackupLoadSelfTest.class));
         suite.addTest(new 
TestSuite(GridCachePartitionedLoadCacheSelfTest.class));

Reply via email to