http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java index 46a5f8c..5f5dfd4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java @@ -75,6 +75,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.LOCAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC; @@ -192,12 +193,6 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo } } - for (int i = 0; i < gridCount(); i++) - assertEquals("Cache is not empty [entrySet=" + grid(i).cache(null).localEntries() + - ", i=" + i + ']', - 0, grid(i).cache(null).localSize()); - - for (int i = 0; i < gridCount(); i++) { GridContinuousProcessor proc = grid(i).context().continuous(); @@ -368,6 +363,114 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo /** * @throws Exception If failed. */ + public void testTwoQueryListener() throws Exception { + if (cacheMode() == LOCAL) + return; + + IgniteCache<Integer, Integer> cache = grid(0).cache(null); + IgniteCache<Integer, Integer> cache1 = grid(1).cache(null); + + final AtomicInteger cntr = new AtomicInteger(0); + final AtomicInteger cntr1 = new AtomicInteger(0); + + ContinuousQuery<Integer, Integer> qry1 = new ContinuousQuery<>(); + ContinuousQuery<Integer, Integer> qry2 = new ContinuousQuery<>(); + + qry1.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> ignore : evts) + cntr.incrementAndGet(); + } + }); + + qry2.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> ignore : evts) + cntr1.incrementAndGet(); + } + }); + + try (QueryCursor<Cache.Entry<Integer, Integer>> query2 = cache1.query(qry2); + QueryCursor<Cache.Entry<Integer, Integer>> query1 = cache.query(qry1)) { + for (int i = 0; i < gridCount(); i++) { + IgniteCache<Object, Object> cache0 = grid(i).cache(null); + + cache0.put(1, 1); + cache0.put(2, 2); + cache0.put(3, 3); + + cache0.remove(1); + cache0.remove(2); + cache0.remove(3); + + final int iter = i + 1; + + assert GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return iter * 6 /* count operation */ * 2 /* count continues queries*/ + == (cntr.get() + cntr1.get()); + } + }, 5000L); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testRestartQuery() throws Exception { + if (cacheMode() == LOCAL) + return; + + IgniteCache<Integer, Integer> cache = grid(0).cache(null); + + final int parts = grid(0).affinity(null).partitions(); + + final int keyCnt = parts * 2; + + for (int i = 0; i < parts / 2; i++) + cache.put(i, i); + + for (int i = 0; i < 10; i++) { + if (i % 2 == 0) { + final AtomicInteger cntr = new AtomicInteger(0); + + ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>(); + + qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated( + Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> ignore : evts) + cntr.incrementAndGet(); + } + }); + + QueryCursor<Cache.Entry<Integer, Integer>> query = cache.query(qry); + + for (int key = 0; key < keyCnt; key++) + cache.put(key, key); + + try { + assert GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return cntr.get() == keyCnt; + } + }, 2000L); + } + finally { + query.close(); + } + } + else { + for (int key = 0; key < keyCnt; key++) + cache.put(key, key); + } + } + } + + /** + * @throws Exception If failed. + */ public void testEntriesByFilter() throws Exception { IgniteCache<Integer, Integer> cache = grid(0).cache(null); @@ -852,44 +955,6 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo /** * @throws Exception If failed. */ - public void testNodeJoin() throws Exception { - IgniteCache<Integer, Integer> cache = grid(0).cache(null); - - ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>(); - - final Collection<CacheEntryEvent<? extends Integer, ? extends Integer>> all = new ConcurrentLinkedDeque8<>(); - final CountDownLatch latch = new CountDownLatch(30); - - qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { - @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { - for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) - all.add(evt); - - latch.countDown(); - } - }); - - try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) { - cache.put(0, 0); - - startGrid("anotherGrid"); - - for (int i = 1; i < 30; i++) { - cache.put(i, i); - } - - assert latch.await(LATCH_TIMEOUT, MILLISECONDS) : all; - - assertEquals(30, all.size()); - } - finally { - stopGrid("anotherGrid"); - } - } - - /** - * @throws Exception If failed. - */ @SuppressWarnings("TryFinallyCanBeTryWithResources") public void testNodeJoinWithoutCache() throws Exception { IgniteCache<Integer, Integer> cache = grid(0).cache(null);
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxSelfTest.java new file mode 100644 index 0000000..91b6b9c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxSelfTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.NearCacheConfiguration; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; + +/** + * Continuous queries tests for atomic cache. + */ +public class GridCacheContinuousQueryTxSelfTest extends GridCacheContinuousQueryPartitionedSelfTest { + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return TRANSACTIONAL; + } + + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.REPLICATED; + } + + /** {@inheritDoc} */ + @Override protected NearCacheConfiguration nearConfiguration() { + return null; + } + + /** {@inheritDoc} */ + @Override public void testInternalKey() throws Exception { + // No-op. + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientReconnectTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientReconnectTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientReconnectTest.java new file mode 100644 index 0000000..2e1d78d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientReconnectTest.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.util.concurrent.CountDownLatch; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryUpdatedListener; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteClientReconnectAbstractTest; +import org.apache.ignite.resources.LoggerResource; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class IgniteCacheContinuousQueryClientReconnectTest extends IgniteClientReconnectAbstractTest { + /** {@inheritDoc} */ + @Override protected int serverCount() { + return 4; + } + + /** {@inheritDoc} */ + @Override protected int clientCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setAtomicityMode(atomicMode()); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** + * @return Atomic mode. + */ + protected CacheAtomicityMode atomicMode() { + return ATOMIC; + } + + /** + * @throws Exception If failed. + */ + public void testReconnectClient() throws Exception { + Ignite client = grid(serverCount()); + + Ignite srv = clientRouter(client); + + assertTrue(client.cluster().localNode().isClient()); + + final CacheEventListener lsnr = new CacheEventListener(); + + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + + qry.setLocalListener(lsnr); + + IgniteCache<Object, Object> clnCache = client.cache(null); + + QueryCursor<?> cur = clnCache.query(qry); + + int keyCnt = 100; + + for (int i = 0; i < 10; i++) { + lsnr.latch = new CountDownLatch(keyCnt); + + for (int key = 0; key < keyCnt; key++) + clnCache.put(key, key); + + assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS)); + + reconnectClientNode(client, srv, null); + + lsnr.latch = new CountDownLatch(keyCnt); + + for (int key = 0; key < keyCnt; key++) + clnCache.put(key, key); + + assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS)); + } + + cur.close(); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectClientAndLeftRouter() throws Exception { + Ignite client = grid(serverCount()); + + final Ignite srv = clientRouter(client); + + final String clnRouterName = srv.name(); + + assertTrue(client.cluster().localNode().isClient()); + + final CacheEventListener lsnr = new CacheEventListener(); + + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + + qry.setLocalListener(lsnr); + + IgniteCache<Object, Object> clnCache = client.cache(null); + + QueryCursor<?> cur = clnCache.query(qry); + + int keyCnt = 100; + + lsnr.latch = new CountDownLatch(keyCnt); + + for (int key = 0; key < keyCnt; key++) + clnCache.put(key, key); + + assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS)); + + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + stopGrid(clnRouterName); + } + }); + + assertFalse("Client connected to the same server node.", clnRouterName.equals(clientRouter(client).name())); + + lsnr.latch = new CountDownLatch(keyCnt); + + for (int key = 0; key < keyCnt; key++) + clnCache.put(key, key); + + assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS)); + + cur.close(); + } + + /** + * + */ + private static class CacheEventListener implements CacheEntryUpdatedListener<Object, Object> { + /** */ + private volatile CountDownLatch latch = new CountDownLatch(1); + + /** */ + @LoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { + for (CacheEntryEvent<?, ?> evt : evts) { + log.info("Received cache event: " + evt); + + latch.countDown(); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java index 1afeb05..534f298 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java @@ -27,11 +27,13 @@ import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.resources.LoggerResource; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import static java.util.concurrent.TimeUnit.SECONDS; @@ -83,11 +85,13 @@ public class IgniteCacheContinuousQueryClientTest extends GridCommonAbstractTest client = true; - Ignite clientNode = startGrid(3); + final int CLIENT_ID = 3; + + Ignite clientNode = startGrid(CLIENT_ID); client = false; - CacheEventListener lsnr = new CacheEventListener(); + final CacheEventListener lsnr = new CacheEventListener(); ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); @@ -95,27 +99,154 @@ public class IgniteCacheContinuousQueryClientTest extends GridCommonAbstractTest QueryCursor<?> cur = clientNode.cache(null).query(qry); - Ignite joined1 = startGrid(4); + for (int i = 0; i < 10; i++) { + log.info("Start iteration: " + i); + + lsnr.latch = new CountDownLatch(1); + + Ignite joined1 = startGrid(4); + + IgniteCache<Object, Object> joinedCache1 = joined1.cache(null); + + joinedCache1.put(primaryKey(joinedCache1), 1); + + assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS)); + + lsnr.latch = new CountDownLatch(1); + + Ignite joined2 = startGrid(5); + + IgniteCache<Object, Object> joinedCache2 = joined2.cache(null); - IgniteCache<Object, Object> joinedCache1 = joined1.cache(null); + joinedCache2.put(primaryKey(joinedCache2), 2); - joinedCache1.put(primaryKey(joinedCache1), 1); + assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS)); - assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS)); + stopGrid(4); + + stopGrid(5); + } cur.close(); + } + + /** + * @throws Exception If failed. + */ + public void testNodeJoinsRestartQuery() throws Exception { + startGrids(2); + + client = true; + + final int CLIENT_ID = 3; + + Ignite clientNode = startGrid(CLIENT_ID); + + client = false; + + for (int i = 0; i < 10; i++) { + log.info("Start iteration: " + i); + + final CacheEventListener lsnr = new CacheEventListener(); + + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + + qry.setLocalListener(lsnr); + + QueryCursor<?> cur = clientNode.cache(null).query(qry); + + lsnr.latch = new CountDownLatch(1); + + Ignite joined1 = startGrid(4); + + IgniteCache<Object, Object> joinedCache1 = joined1.cache(null); + + joinedCache1.put(primaryKey(joinedCache1), 1); + + assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS)); + + cur.close(); - lsnr.latch = new CountDownLatch(1); + lsnr.latch = new CountDownLatch(1); - Ignite joined2 = startGrid(5); + Ignite joined2 = startGrid(5); - IgniteCache<Object, Object> joinedCache2 = joined2.cache(null); + IgniteCache<Object, Object> joinedCache2 = joined2.cache(null); - joinedCache2.put(primaryKey(joinedCache2), 2); + joinedCache2.put(primaryKey(joinedCache2), 2); - U.sleep(1000); + assertFalse("Unexpected event received.", GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return 1 != lsnr.latch.getCount(); + } + }, 1000)); - assertEquals("Unexpected event received.", 1, lsnr.latch.getCount()); + stopGrid(4); + + stopGrid(5); + } + } + + /** + * @throws Exception If failed. + */ + public void testServerNodeLeft() throws Exception { + startGrids(3); + + client = true; + + final int CLIENT_ID = 3; + + Ignite clnNode = startGrid(CLIENT_ID); + + client = false; + + IgniteOutClosure<IgniteCache<Integer, Integer>> rndCache = + new IgniteOutClosure<IgniteCache<Integer, Integer>>() { + int cnt = 0; + + @Override public IgniteCache<Integer, Integer> apply() { + ++cnt; + + return grid(CLIENT_ID).cache(null); + } + }; + + final CacheEventListener lsnr = new CacheEventListener(); + + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + + qry.setLocalListener(lsnr); + + QueryCursor<?> cur = clnNode.cache(null).query(qry); + + boolean first = true; + + int keyCnt = 1; + + for (int i = 0; i < 10; i++) { + log.info("Start iteration: " + i); + + if (first) + first = false; + else { + for (int srv = 0; srv < CLIENT_ID - 1; srv++) + startGrid(srv); + } + + lsnr.latch = new CountDownLatch(keyCnt); + + for (int key = 0; key < keyCnt; key++) + rndCache.apply().put(key, key); + + assertTrue("Failed to wait for event. Left events: " + lsnr.latch.getCount(), + lsnr.latch.await(10, SECONDS)); + + for (int srv = 0; srv < CLIENT_ID - 1; srv++) + stopGrid(srv); + } + + cur.close(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTxReconnectTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTxReconnectTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTxReconnectTest.java new file mode 100644 index 0000000..a10ebc9 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTxReconnectTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.cache.CacheAtomicityMode; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; + +/** + * + */ +public class IgniteCacheContinuousQueryClientTxReconnectTest extends IgniteCacheContinuousQueryClientReconnectTest { + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicMode() { + return TRANSACTIONAL; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PSameClassLoaderSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PSameClassLoaderSelfTest.java b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PSameClassLoaderSelfTest.java index 108b11d..ba5e15b 100644 --- a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PSameClassLoaderSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PSameClassLoaderSelfTest.java @@ -23,7 +23,11 @@ import java.net.URLClassLoader; import org.apache.ignite.Ignite; import org.apache.ignite.configuration.DeploymentMode; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.config.GridTestProperties; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.testframework.junits.common.GridCommonTest; @@ -41,6 +45,9 @@ public class GridP2PSameClassLoaderSelfTest extends GridCommonAbstractTest { private static final String TEST_TASK2_NAME = "org.apache.ignite.tests.p2p.P2PTestTaskExternalPath2"; /** */ + private static final TcpDiscoveryIpFinder FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ private static final ClassLoader CLASS_LOADER; /** Current deployment mode. Used in {@link #getConfiguration(String)}. */ @@ -66,6 +73,7 @@ public class GridP2PSameClassLoaderSelfTest extends GridCommonAbstractTest { cfg.setDeploymentMode(depMode); ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setHeartbeatFrequency(500); + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(FINDER); cfg.setCacheConfiguration(); @@ -81,10 +89,16 @@ public class GridP2PSameClassLoaderSelfTest extends GridCommonAbstractTest { @SuppressWarnings({"unchecked"}) private void processTest(boolean isIsolatedDifferentTask, boolean isIsolatedDifferentNode) throws Exception { try { - Ignite ignite1 = startGrid(1); + final Ignite ignite1 = startGrid(1); Ignite ignite2 = startGrid(2); Ignite ignite3 = startGrid(3); + assert GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return ignite1.cluster().nodes().size() == 3; + } + }, 20000L); + Class task1 = CLASS_LOADER.loadClass(TEST_TASK1_NAME); Class task2 = CLASS_LOADER.loadClass(TEST_TASK2_NAME); http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java index 3e41979..6f9c559 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java @@ -1248,7 +1248,7 @@ public abstract class GridAbstractTest extends TestCase { if (isDebug()) { discoSpi.setMaxMissedHeartbeats(Integer.MAX_VALUE); - cfg.setNetworkTimeout(Long.MAX_VALUE); + cfg.setNetworkTimeout(Long.MAX_VALUE / 3); } else { // Set network timeout to 10 sec to avoid unexpected p2p class loading errors. http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java index c19e718..e0ffc60 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java @@ -431,6 +431,9 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { for (IgniteCacheProxy<?, ?> c : g0.context().cache().jcaches()) { CacheConfiguration cfg = c.context().config(); + if (cfg == null) + continue; + if (cfg.getCacheMode() == PARTITIONED && cfg.getRebalanceMode() != NONE && g.cluster().nodes().size() > 1) { http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index 6cb1a52..6cc2599 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -65,6 +65,10 @@ import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalAtomicQ import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalFieldsQuerySelfTest; import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalQuerySelfTest; import org.apache.ignite.internal.processors.cache.query.GridCacheSwapScanQuerySelfTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicReplicatedSelfTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxReplicatedSelfTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicNearEnabledSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicP2PDisabledSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicSelfTest; @@ -77,7 +81,10 @@ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheCon import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedOneNodeSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedP2PDisabledSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedSelfTest; +import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryTxSelfTest; +import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientReconnectTest; import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTest; +import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTxReconnectTest; import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryAtomicSelfTest; import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryLocalSelfTest; import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryPartitionedSelfTest; @@ -157,11 +164,18 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(GridCacheContinuousQueryPartitionedSelfTest.class); suite.addTestSuite(GridCacheContinuousQueryPartitionedOnlySelfTest.class); suite.addTestSuite(GridCacheContinuousQueryPartitionedP2PDisabledSelfTest.class); + suite.addTestSuite(GridCacheContinuousQueryTxSelfTest.class); suite.addTestSuite(GridCacheContinuousQueryAtomicSelfTest.class); suite.addTestSuite(GridCacheContinuousQueryAtomicNearEnabledSelfTest.class); suite.addTestSuite(GridCacheContinuousQueryAtomicP2PDisabledSelfTest.class); - suite.addTestSuite(IgniteCacheContinuousQueryClientTest.class); suite.addTestSuite(GridCacheContinuousQueryReplicatedOneNodeSelfTest.class); + suite.addTestSuite(IgniteCacheContinuousQueryClientTest.class); + suite.addTestSuite(IgniteCacheContinuousQueryClientReconnectTest.class); + suite.addTestSuite(IgniteCacheContinuousQueryClientTxReconnectTest.class); + suite.addTestSuite(CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest.class); + suite.addTestSuite(CacheContinuousQueryFailoverAtomicReplicatedSelfTest.class); + suite.addTestSuite(CacheContinuousQueryFailoverTxSelfTest.class); + suite.addTestSuite(CacheContinuousQueryFailoverTxReplicatedSelfTest.class); // Reduce fields queries. suite.addTestSuite(GridCacheReduceFieldsQueryLocalSelfTest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/CacheEntryEventProbe.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/CacheEntryEventProbe.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/CacheEntryEventProbe.java new file mode 100644 index 0000000..e42479a --- /dev/null +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/CacheEntryEventProbe.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.yardstick.cache; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryUpdatedListener; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.yardstickframework.BenchmarkConfiguration; +import org.yardstickframework.BenchmarkDriver; +import org.yardstickframework.BenchmarkProbe; +import org.yardstickframework.BenchmarkProbePoint; + +import static java.util.concurrent.TimeUnit.MINUTES; +import static org.yardstickframework.BenchmarkUtils.errorHelp; +import static org.yardstickframework.BenchmarkUtils.println; + +/** + * Probe which calculate continuous query events. + */ +public class CacheEntryEventProbe implements BenchmarkProbe { + /** */ + private BenchmarkConfiguration cfg; + + /** Counter. */ + private AtomicLong cnt = new AtomicLong(0); + + /** Collected points. */ + private Collection<BenchmarkProbePoint> collected = new ArrayList<>(); + + /** Query cursor. */ + private QueryCursor qryCur; + + /** Service building probe points. */ + private ExecutorService buildingService; + + /** {@inheritDoc} */ + @Override public void start(BenchmarkDriver drv, BenchmarkConfiguration cfg) throws Exception { + this.cfg = cfg; + + if (drv instanceof IgniteCacheAbstractBenchmark) { + IgniteCacheAbstractBenchmark drv0 = (IgniteCacheAbstractBenchmark)drv; + + if (drv0.cache() != null) { + ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>(); + + qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> + events) throws CacheEntryListenerException { + int size = 0; + + for (CacheEntryEvent<? extends Integer, ? extends Integer> e : events) + ++size; + + cnt.addAndGet(size); + } + }); + + qryCur = drv0.cache().query(qry); + + buildingService = Executors.newSingleThreadExecutor(); + + buildingService.submit(new Runnable() { + @Override public void run() { + try { + while (!Thread.currentThread().isInterrupted()) { + Thread.sleep(1000); + + long evts = cnt.getAndSet(0); + + BenchmarkProbePoint pnt = new BenchmarkProbePoint( + TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()), + new double[] {evts}); + + collectPoint(pnt); + } + } + catch (InterruptedException e) { + // No-op. + } + } + }); + + println(cfg, getClass().getSimpleName() + " probe is started."); + } + } + + if (qryCur == null) + errorHelp(cfg, "Can not start " + getClass().getSimpleName() + + " probe. Probably, the driver doesn't provide \"cache()\" method."); + } + + /** {@inheritDoc} */ + @Override public void stop() throws Exception { + if (qryCur != null) { + qryCur.close(); + + qryCur = null; + + buildingService.shutdownNow(); + + buildingService.awaitTermination(1, MINUTES); + + println(cfg, getClass().getSimpleName() + " is stopped."); + } + } + + /** {@inheritDoc} */ + @Override public Collection<String> metaInfo() { + return Arrays.asList("Time, sec", "Received events/sec (more is better)"); + } + + /** {@inheritDoc} */ + @Override public synchronized Collection<BenchmarkProbePoint> points() { + Collection<BenchmarkProbePoint> ret = collected; + + collected = new ArrayList<>(ret.size() + 5); + + return ret; + } + + /** {@inheritDoc} */ + @Override public void buildPoint(long time) { + // No-op. + } + + /** + * @param pnt Probe point. + */ + private synchronized void collectPoint(BenchmarkProbePoint pnt) { + collected.add(pnt); + } +}