ignite-5075

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9b850cc1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9b850cc1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9b850cc1

Branch: refs/heads/ignite-5075
Commit: 9b850cc181bcd25f2362edeed33f7aa8a097f555
Parents: e4be5ab
Author: sboikov <[email protected]>
Authored: Mon May 15 12:05:07 2017 +0300
Committer: sboikov <[email protected]>
Committed: Mon May 15 12:26:29 2017 +0300

----------------------------------------------------------------------
 .../distributed/GridDistributedBaseMessage.java |   1 -
 .../cache/transactions/IgniteTxEntry.java       |   4 +-
 .../processors/cache/IgniteCacheGroupsTest.java | 187 ++++++++++++++++++-
 3 files changed, 182 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9b850cc1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
index 65b16a4..fc209aa 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
@@ -25,7 +25,6 @@ import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
 import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionable;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b850cc1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 163ed99..30aa335 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -565,7 +565,9 @@ public class IgniteTxEntry implements GridPeerDeployAware, 
Message {
      */
     public void cached(GridCacheEntryEx entry) {
         assert entry == null || entry.context() == ctx : "Invalid entry 
assigned to tx entry [txEntry=" + this +
-            ", entry=" + entry + ", ctxNear=" + ctx.isNear() + ", ctxDht=" + 
ctx.isDht() + ']';
+            ", entry=" + entry +
+            ", ctxNear=" + ctx.isNear() +
+            ", ctxDht=" + ctx.isDht() + ']';
 
         this.entry = entry;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b850cc1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
index 39dc044..c10321e 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
@@ -19,15 +19,21 @@ package org.apache.ignite.internal.processors.cache;
 
 import java.io.Serializable;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.Ignition;
 import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -35,6 +41,8 @@ import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 
 /**
@@ -84,7 +92,7 @@ public class IgniteCacheGroupsTest extends 
GridCommonAbstractTest {
 
         Ignite client = startGrid(1);
 
-        IgniteCache c1 = client.createCache(cacheConfiguration(GROUP1, "c1", 
ATOMIC, 0));
+        IgniteCache c1 = client.createCache(cacheConfiguration(GROUP1, "c1", 
PARTITIONED, ATOMIC, 0));
 
         checkCacheGroup(0, GROUP1, true);
         checkCacheGroup(0, GROUP1, true);
@@ -137,7 +145,7 @@ public class IgniteCacheGroupsTest extends 
GridCommonAbstractTest {
         for (int iter = 0; iter < 3; iter++) {
             log.info("Iteration: " + iter);
 
-            srv0.createCache(cacheConfiguration(GROUP1, "cache1", ATOMIC, 2));
+            srv0.createCache(cacheConfiguration(GROUP1, "cache1", PARTITIONED, 
ATOMIC, 2));
 
             for (int i = 0; i < srvs; i++) {
                 checkCacheGroup(i, GROUP1, true);
@@ -145,7 +153,7 @@ public class IgniteCacheGroupsTest extends 
GridCommonAbstractTest {
                 checkCache(i, "cache1");
             }
 
-            srv0.createCache(cacheConfiguration(GROUP1, "cache2", ATOMIC, 2));
+            srv0.createCache(cacheConfiguration(GROUP1, "cache2", PARTITIONED, 
ATOMIC, 2));
 
             for (int i = 0; i < srvs; i++) {
                 checkCacheGroup(i, GROUP1, true);
@@ -193,8 +201,10 @@ public class IgniteCacheGroupsTest extends 
GridCommonAbstractTest {
         Ignite srv0 = startGrid(0);
 
         {
-            IgniteCache<Object, Object> cache1 = 
srv0.createCache(cacheConfiguration("grp1", "cache1", ATOMIC, 2));
-            IgniteCache<Object, Object> cache2 = 
srv0.createCache(cacheConfiguration("grp1", "cache2", ATOMIC, 2));
+            IgniteCache<Object, Object> cache1 =
+                srv0.createCache(cacheConfiguration("grp1", "cache1", 
PARTITIONED, ATOMIC, 2));
+            IgniteCache<Object, Object> cache2 =
+                srv0.createCache(cacheConfiguration("grp1", "cache2", 
PARTITIONED, ATOMIC, 2));
 
             cache1.put(new Key1(1), 1);
             assertEquals(1, cache1.get(new Key1(1)));
@@ -231,8 +241,10 @@ public class IgniteCacheGroupsTest extends 
GridCommonAbstractTest {
         Ignite srv0 = startGrid(0);
 
         {
-            IgniteCache<Object, Object> cache1 = 
srv0.createCache(cacheConfiguration("grp1", "cache1", ATOMIC, 0));
-            IgniteCache<Object, Object> cache2 = 
srv0.createCache(cacheConfiguration("grp1", "cache2", ATOMIC, 0));
+            IgniteCache<Object, Object> cache1 =
+                srv0.createCache(cacheConfiguration(GROUP1, "cache1", 
PARTITIONED, ATOMIC, 0));
+            IgniteCache<Object, Object> cache2 =
+                srv0.createCache(cacheConfiguration(GROUP1, "cache2", 
PARTITIONED, ATOMIC, 0));
 
             for (int i = 0; i < 10; i++) {
                 cache1.put(new Key1(i), 1);
@@ -246,6 +258,162 @@ public class IgniteCacheGroupsTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testCacheApiTx() throws Exception {
+        startGridsMultiThreaded(4);
+
+        client = true;
+
+        startGrid(4);
+
+        cacheApiTest(PARTITIONED, TRANSACTIONAL, 2);
+    }
+
+    /**
+     * @param cacheMode Cache mode.
+     * @param atomicityMode Atomicity mode.
+     * @param backups Number of backups.
+     */
+    private void cacheApiTest(CacheMode cacheMode, CacheAtomicityMode 
atomicityMode, int backups) {
+        for (int i = 0; i < 2; i++)
+            ignite(0).createCache(cacheConfiguration(GROUP1, "cache-" + i, 
cacheMode, atomicityMode, backups));
+
+        for (Ignite node : Ignition.allGrids()) {
+            for (int i = 0; i < 2; i++) {
+                IgniteCache cache = node.cache("cache-" + i);
+
+                log.info("Test cache [node=" + node.name() + ", cache=" + 
cache.getName() +
+                    ", mode=" + cacheMode + ", atomicity=" + atomicityMode + 
", backups=" + backups + ']');
+
+                cacheApiTest(cache);
+            }
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     */
+    private void cacheApiTest(IgniteCache cache) {
+        int key = 1;
+
+        cache.put(key, 1);
+
+        cache.remove(key);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentOperations() throws Exception {
+        final int SRVS = 4;
+        final int CLIENTS = 4;
+        final int NODES = SRVS + CLIENTS;
+
+        Ignite srv0 = startGridsMultiThreaded(SRVS);
+
+        client = true;
+
+        startGridsMultiThreaded(SRVS, CLIENTS);
+
+        final int CACHES = 4;
+
+        for (int i = 0; i < CACHES; i++) {
+            srv0.createCache(cacheConfiguration(GROUP1, GROUP1 + "-" + i, 
PARTITIONED, ATOMIC, i));
+            srv0.createCache(cacheConfiguration(GROUP2, GROUP2 + "-" + i, 
PARTITIONED, TRANSACTIONAL, i));
+        }
+
+        final AtomicInteger idx = new AtomicInteger();
+
+        final AtomicBoolean err = new AtomicBoolean();
+
+        final AtomicBoolean stop = new AtomicBoolean();
+
+        IgniteInternalFuture opFut = GridTestUtils.runMultiThreadedAsync(new 
Runnable() {
+            @Override public void run() {
+                try {
+                    Ignite node = ignite(idx.getAndIncrement() % NODES);
+
+                    log.info("Start thread [node=" + node.name() + ']');
+
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    while (!stop.get()) {
+                        String grp = rnd.nextBoolean() ? GROUP1 : GROUP2;
+                        int cacheIdx = rnd.nextInt(CACHES);
+
+                        IgniteCache cache = node.cache(grp + "-" + cacheIdx);
+
+                        for (int i = 0; i < 10; i++)
+                            cacheOperation(rnd, cache);
+                    }
+                }
+                catch (Exception e) {
+                    err.set(true);
+
+                    log.error("Unexpected error: " + e, e);
+
+                    stop.set(true);
+                }
+            }
+        }, (SRVS + CLIENTS) * 2, "op-thread");
+
+        IgniteInternalFuture cacheFut = GridTestUtils.runAsync(new Runnable() {
+            @Override public void run() {
+                try {
+                    while (!stop.get()) {
+//                        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+//
+//                        String grp = rnd.nextBoolean() ? GROUP1 : GROUP2;
+//
+//                        Ignite node = ignite(rnd.nextInt(NODES));
+//
+//                        IgniteCache cache = 
node.createCache(cacheConfiguration(grp, "tmpCache",
+//                            rnd.nextBoolean() ? ATOMIC : TRANSACTIONAL,
+//                            rnd.nextInt(3)));
+//
+//                        for (int i = 0; i < 10; i++)
+//                            cacheOperation(rnd, cache);
+//
+//                        node.destroyCache(cache.getName());
+
+                        U.sleep(1000);
+                    }
+                }
+                catch (Exception e) {
+                    err.set(true);
+
+                    log.error("Unexpected error: " + e, e);
+
+                    stop.set(true);
+                }
+            }
+        }, "cache-thread");
+
+        try {
+            U.sleep(10_000);
+        }
+        finally {
+            stop.set(true);
+        }
+
+        opFut.get();
+        cacheFut.get();
+
+        assertFalse("Unexpected error, see log for details", err.get());
+    }
+
+    /**
+     * @param rnd Random.
+     * @param cache Cache.
+     */
+    private void cacheOperation(ThreadLocalRandom rnd, IgniteCache cache) {
+        int key = rnd.nextInt(1000);
+
+        cache.put(key, 1);
+    }
+
+    /**
      *
      */
     static class Key1 implements Serializable {
@@ -311,8 +479,10 @@ public class IgniteCacheGroupsTest extends 
GridCommonAbstractTest {
         }
     }
 
-    private CacheConfiguration cacheConfiguration(String grpName,
+    private CacheConfiguration cacheConfiguration(
+        String grpName,
         String name,
+        CacheMode cacheMode,
         CacheAtomicityMode atomicityMode,
         int backups) {
         CacheConfiguration ccfg = new CacheConfiguration();
@@ -321,6 +491,7 @@ public class IgniteCacheGroupsTest extends 
GridCommonAbstractTest {
         ccfg.setGroupName(grpName);
         ccfg.setAtomicityMode(atomicityMode);
         ccfg.setBackups(backups);
+        ccfg.setCacheMode(cacheMode);
         ccfg.setWriteSynchronizationMode(FULL_SYNC);
 
         return ccfg;

Reply via email to