Repository: incubator-ignite Updated Branches: refs/heads/ignite-901 401efd7c3 -> 92908b91c
IGNITE-901 Added tests. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/92908b91 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/92908b91 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/92908b91 Branch: refs/heads/ignite-901 Commit: 92908b91ce067415af9e6e682b05fc602cc87da9 Parents: 401efd7 Author: nikolay_tikhonov <[email protected]> Authored: Mon Jul 6 15:38:55 2015 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Mon Jul 6 15:38:55 2015 +0300 ---------------------------------------------------------------------- .../IgniteClientReconnectAbstractTest.java | 48 ++- .../IgniteClientReconnectApiBlockTest.java | 23 -- .../IgniteClientReconnectAtomicsTest.java | 138 +++++-- .../IgniteClientReconnectCollectionsTest.java | 63 +++- .../IgniteClientReconnectComputeTest.java | 87 +++-- .../IgniteClientReconnectQueriesTest.java | 28 -- .../IgniteClientReconnectServicesTest.java | 171 +++++++-- .../IgniteClientReconnectStreamerTest.java | 137 ++++++- .../IgniteClientReconnectTestSuite.java | 1 - .../IgniteClientReconnectQueriesTest.java | 356 +++++++++++++++++++ .../IgniteCacheQuerySelfTestSuite.java | 4 + 11 files changed, 885 insertions(+), 171 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/92908b91/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java index 0f8aadd..99dddf9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java @@ -35,7 +35,6 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.apache.ignite.spi.discovery.tcp.messages.*; import org.apache.ignite.testframework.junits.common.*; -import org.eclipse.jetty.util.*; import org.jetbrains.annotations.*; import java.io.*; @@ -212,6 +211,32 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra } /** + * @param e Client disconnected exception. + */ + protected void checkAndWait(Exception e) { + log.info("Expected exception: " + e); + + if (e instanceof IgniteClientDisconnectedException){ + ((IgniteClientDisconnectedException)e).reconnectFuture().get(); + + return; + } + + IgniteClientDisconnectedException discException = X.cause(e, IgniteClientDisconnectedException.class); + + if (discException != null) + discException.reconnectFuture().get(); + + IgniteClientDisconnectedCheckedException discCheckedException = + X.cause(e, IgniteClientDisconnectedCheckedException.class); + + if (discCheckedException != null) + discCheckedException.reconnectFuture().get(); + else + fail("Unexpected exception: " + e); + } + + /** * */ protected static class TestTcpDiscoverySpi extends TcpDiscoverySpi { @@ -242,9 +267,11 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra /** */ volatile Class msgClass; + /** */ AtomicBoolean collectStart = new AtomicBoolean(false); - ConcurrentHashSet<String> classes = new ConcurrentHashSet<>(); + /** */ + ConcurrentHashMap<String, ClusterNode> classes = new ConcurrentHashMap<>(); /** */ @LoggerResource @@ -255,7 +282,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra Class msgClass0 = msgClass; if (collectStart.get() && msg instanceof GridIoMessage) - classes.add(((GridIoMessage)msg).message().getClass().getName()); + classes.put(((GridIoMessage)msg).message().getClass().getName(), node); if (msgClass0 != null && msg instanceof GridIoMessage && ((GridIoMessage)msg).message().getClass().equals(msgClass)) { @@ -280,5 +307,20 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra public void unblockMsg() { msgClass = null; } + + /** + * Start collect messages. + */ + public void start() { + collectStart.set(true); + } + + /** + * Print collected messages. + */ + public void print() { + for (String s : classes.keySet()) + log.error(s); + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/92908b91/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiBlockTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiBlockTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiBlockTest.java index 9aed13b..e056641 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiBlockTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiBlockTest.java @@ -510,29 +510,6 @@ public class IgniteClientReconnectApiBlockTest extends IgniteClientReconnectAbst } /** - * @param e Client disconnected exception. - */ - private void checkAndWait(Exception e) { - log.info("Expected exception: " + e); - - if (e instanceof IgniteClientDisconnectedException){ - ((IgniteClientDisconnectedException)e).reconnectFuture().get(); - - return; - } - - if (e.getCause() instanceof IgniteClientDisconnectedException) { - IgniteClientDisconnectedException e0 = (IgniteClientDisconnectedException)e.getCause(); - - e0.reconnectFuture().get(); - - return; - } - - fail("Unexpected exception: " + e); - } - - /** * @throws Exception If failed. */ @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/92908b91/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java index 884d5f2..91311ef 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java @@ -24,7 +24,7 @@ import org.apache.ignite.testframework.*; import java.util.concurrent.*; /** - * TODO IGNITE-901 create another after removed. + * */ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstractTest { /** {@inheritDoc} */ @@ -66,6 +66,8 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr assertEquals(1003L, srvAtomicSeq.incrementAndGet()); assertEquals(3L, clientAtomicSeq.incrementAndGet()); + + clientAtomicSeq.close(); } /** @@ -106,6 +108,14 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr return null; } }, IllegalStateException.class, null); + + IgniteAtomicSequence newClientAtomicSeq = client.atomicSequence("atomicSeqRmv", 0, true); + + assertEquals(0, newClientAtomicSeq.get()); + + assertEquals(1, newClientAtomicSeq.incrementAndGet()); + + newClientAtomicSeq.close(); } /** @@ -132,10 +142,18 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { @Override public Object call() throws Exception { - for (int i = 0; i < 3000; i++) - clientAtomicSeq.incrementAndGet(); - - return null; + for (int i = 0; i < 3000; i++) { + try { + clientAtomicSeq.incrementAndGet(); + } + catch (Exception e) { + checkAndWait(e); + + return true; + } + } + + return false; } }); @@ -150,13 +168,9 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr commSpi.unblockMsg(); - reconnectClientNode(client, srv, new Runnable() { - @Override public void run() { - // Check that future failed. - assertNotNull(fut.error()); - assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass()); - } - }); + reconnectClientNode(client, srv, null); + + assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS)); // Check that after reconnect working. assert clientAtomicSeq.incrementAndGet() >= 0; @@ -242,6 +256,21 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr return null; } }, IllegalStateException.class, null); + + IgniteAtomicReference<String> newClientAtomicRef = + client.atomicReference("atomicRefRemoved", "1st value", true); + + IgniteAtomicReference<String> newSrvAtomicRef = srv.atomicReference("atomicRefRemoved", "1st value", false); + + assertEquals("1st value", newClientAtomicRef.get()); + assertTrue(newClientAtomicRef.compareAndSet("1st value", "2st value")); + assertEquals("2st value", newClientAtomicRef.get()); + + assertEquals("2st value", newSrvAtomicRef.get()); + assertTrue(newSrvAtomicRef.compareAndSet("2st value", "3st value")); + assertEquals("3st value", newSrvAtomicRef.get()); + + newClientAtomicRef.close(); } /** @@ -273,7 +302,16 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { @Override public Object call() throws Exception { - return clientAtomicRef.compareAndSet("3st value", "4st value"); + try { + clientAtomicRef.compareAndSet("3st value", "4st value"); + } + catch (Exception e) { + checkAndWait(e); + + return true; + } + + return false; } }); @@ -288,13 +326,9 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr servCommSpi.unblockMsg(); - reconnectClientNode(client, srv, new Runnable() { - @Override public void run() { - // Check that future failed. - assertNotNull(fut.error()); - assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass()); - } - }); + reconnectClientNode(client, srv, null); + + assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS)); // Check that after reconnect working. assertEquals("3st value", clientAtomicRef.get()); @@ -384,6 +418,20 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr return null; } }, IllegalStateException.class, null); + + IgniteAtomicStamped newClientAtomicStamped = client.atomicStamped("atomicStampedRemoved", 0, 0, true); + + assertEquals(true, newClientAtomicStamped.compareAndSet(0, 1, 0, 1)); + assertEquals(1, newClientAtomicStamped.value()); + assertEquals(1, newClientAtomicStamped.stamp()); + + IgniteAtomicStamped newSrvAtomicStamped = srv.atomicStamped("atomicStampedRemoved", 0, 0, false); + + assertEquals(true, newSrvAtomicStamped.compareAndSet(1, 2, 1, 2)); + assertEquals(2, newSrvAtomicStamped.value()); + assertEquals(2, newSrvAtomicStamped.stamp()); + + newClientAtomicStamped.close(); } /** @@ -414,7 +462,16 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { @Override public Object call() throws Exception { - return clientAtomicStamped.compareAndSet(2, 3, 2, 3); + try { + clientAtomicStamped.compareAndSet(2, 3, 2, 3); + } + catch (Exception e) { + checkAndWait(e); + + return true; + } + + return false; } }); @@ -429,13 +486,9 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr servCommSpi.unblockMsg(); - reconnectClientNode(client, srv, new Runnable() { - @Override public void run() { - // Check that future failed. - assertNotNull(fut.error()); - assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass()); - } - }); + reconnectClientNode(client, srv, null); + + assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS)); // Check that after reconnect working. assertEquals(true, clientAtomicStamped.compareAndSet(2, 3, 2, 3)); @@ -511,6 +564,14 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr return null; } }, IllegalStateException.class, null); + + IgniteAtomicLong newClientAtomicLong = client.atomicLong("atomicLongRmv", 0, true); + + assertEquals(0L, newClientAtomicLong.getAndAdd(1)); + + IgniteAtomicLong newSrvAtomicLong = srv.atomicLong("atomicLongRmv", 0, false); + + assertEquals(1L, newSrvAtomicLong.getAndAdd(1)); } /** @@ -533,7 +594,16 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { @Override public Object call() throws Exception { - return clientAtomicLong.getAndAdd(1); + try { + clientAtomicLong.getAndAdd(1); + } + catch (Exception e) { + checkAndWait(e); + + return true; + } + + return false; } }); @@ -548,13 +618,9 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr commSpi.unblockMsg(); - reconnectClientNode(client, srv, new Runnable() { - @Override public void run() { - // Check that future failed. - assertNotNull(fut.error()); - assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass()); - } - }); + reconnectClientNode(client, srv, null); + + assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS)); // Check that after reconnect working. assertEquals(1, clientAtomicLong.addAndGet(1)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/92908b91/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java index 77e9c03..f24a8ae 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java @@ -29,7 +29,7 @@ import static org.apache.ignite.cache.CacheAtomicityMode.*; import static org.apache.ignite.cache.CacheMode.*; /** - * TODO IGNITE-901 create another after removed. + * */ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectAbstractTest { /** {@inheritDoc} */ @@ -224,6 +224,16 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA return null; } }, IllegalStateException.class, null); + + IgniteSet<String> newClientSet = client.set(setName, colCfg); + + IgniteSet<String> newSrvSet = srv.set(setName, null); + + assertTrue(newClientSet.add("1")); + + assertFalse(newSrvSet.add("1")); + + newSrvSet.close(); } /** @@ -256,7 +266,17 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { @Override public Object call() throws Exception { - return clientSet.add("2"); + try { + for (int i = 0; i < 100; i++) + clientSet.add("2"); + } + catch (Exception e) { + checkAndWait(e); + + return true; + } + + return false; } }); @@ -271,13 +291,9 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA commSpi.unblockMsg(); - reconnectClientNode(client, srv, new Runnable() { - @Override public void run() { - // Check that future failed. - assertNotNull("Future was not failed. Atomic mode: " + colCfg.getAtomicityMode() + ".", fut.error()); - assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass()); - } - }); + reconnectClientNode(client, srv, null); + + assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS)); assertTrue(clientSet.add("3")); @@ -352,6 +368,14 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA return null; } }, IllegalStateException.class, null); + + IgniteQueue<String> newClientQueue = client.queue(setName, 10, colCfg); + + IgniteQueue<String> newSrvQueue = srv.queue(setName, 10, null); + + assertTrue(newClientQueue.add("1")); + + assertTrue(newSrvQueue.add("2")); } /** @@ -384,7 +408,16 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { @Override public Object call() throws Exception { - return clientQueue.add("2"); + try { + clientQueue.add("2"); + } + catch (Exception e) { + checkAndWait(e); + + return true; + } + + return false; } }); @@ -399,13 +432,9 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA commSpi.unblockMsg(); - reconnectClientNode(client, srv, new Runnable() { - @Override public void run() { - // Check that future failed. - assertNotNull("Future was not failed. Atomic mode: " + colCfg.getAtomicityMode() + ".", fut.error()); - assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass()); - } - }); + reconnectClientNode(client, srv, null); + + assertTrue("Future was not failed. Atomic mode: " + colCfg.getAtomicityMode() + ".", (Boolean)fut.get()); assertTrue(clientQueue.add("3")); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/92908b91/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java index 186459e..212fdc1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java @@ -59,11 +59,20 @@ public class IgniteClientReconnectComputeTest extends IgniteClientReconnectAbstr final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { @Override public Object call() throws Exception { - return client.compute().affinityCall("test-cache", 40, new IgniteCallable<Object>() { - @Override public Integer call() throws Exception { - return 42; - } - }); + try { + client.compute().affinityCall("test-cache", 40, new IgniteCallable<Object>() { + @Override public Integer call() throws Exception { + return 42; + } + }); + } + catch (Exception e) { + checkAndWait(e); + + return true; + } + + return false; } }); @@ -78,13 +87,9 @@ public class IgniteClientReconnectComputeTest extends IgniteClientReconnectAbstr commSpi.unblockMsg(); - reconnectClientNode(client, srv, new Runnable() { - @Override public void run() { - // Check that future failed. - assertNotNull(fut.error()); - assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass()); - } - }); + reconnectClientNode(client, srv, null); + + assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS)); } /** @@ -103,11 +108,20 @@ public class IgniteClientReconnectComputeTest extends IgniteClientReconnectAbstr final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { @Override public Object call() throws Exception { - return client.compute().broadcast(new IgniteCallable<Object>() { - @Override public Object call() throws Exception { - return 42; - } - }); + try { + client.compute().broadcast(new IgniteCallable<Object>() { + @Override public Object call() throws Exception { + return 42; + } + }); + } + catch (Exception e) { + checkAndWait(e); + + return true; + } + + return false; } }); @@ -122,13 +136,9 @@ public class IgniteClientReconnectComputeTest extends IgniteClientReconnectAbstr commSpi.unblockMsg(); - reconnectClientNode(client, srv, new Runnable() { - @Override public void run() { - // Check that future failed. - assertNotNull(fut.error()); - assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass()); - } - }); + reconnectClientNode(client, srv, null); + + assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS)); } /** @@ -147,11 +157,20 @@ public class IgniteClientReconnectComputeTest extends IgniteClientReconnectAbstr final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { @Override public Object call() throws Exception { - return client.compute().apply(new IgniteClosure<Integer, Integer>() { - @Override public Integer apply(Integer o) { - return o + 1; - } - }, Arrays.asList(1, 2, 3)); + try { + client.compute().apply(new IgniteClosure<Integer, Integer>() { + @Override public Integer apply(Integer o) { + return o + 1; + } + }, Arrays.asList(1, 2, 3)); + } + catch (Exception e) { + checkAndWait(e); + + return true; + } + + return false; } }); @@ -166,12 +185,8 @@ public class IgniteClientReconnectComputeTest extends IgniteClientReconnectAbstr commSpi.unblockMsg(); - reconnectClientNode(client, srv, new Runnable() { - @Override public void run() { - // Check that future failed. - assertNotNull(fut.error()); - assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass()); - } - }); + reconnectClientNode(client, srv, null); + + assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS)); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/92908b91/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectQueriesTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectQueriesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectQueriesTest.java deleted file mode 100644 index 813d06c..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectQueriesTest.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal; - -/** - * TODO IGNITE-901. - */ -public class IgniteClientReconnectQueriesTest extends IgniteClientReconnectAbstractTest { - /** {@inheritDoc} */ - @Override protected int serverCount() { - return 1; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/92908b91/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java index e79f2aa..681efa4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java @@ -18,19 +18,16 @@ package org.apache.ignite.internal; import org.apache.ignite.*; -import org.apache.ignite.events.*; +import org.apache.ignite.internal.processors.continuous.*; import org.apache.ignite.internal.processors.service.*; -import org.apache.ignite.lang.*; import org.apache.ignite.resources.*; import org.apache.ignite.services.*; +import org.apache.ignite.testframework.*; import java.util.concurrent.*; -import static java.util.concurrent.TimeUnit.*; -import static org.apache.ignite.events.EventType.*; - /** - * TODO IGNITE-901: fail after disconnect, disconnect when operation in progress, service deployed on client. + * */ public class IgniteClientReconnectServicesTest extends IgniteClientReconnectAbstractTest { /** {@inheritDoc} */ @@ -53,9 +50,9 @@ public class IgniteClientReconnectServicesTest extends IgniteClientReconnectAbst IgniteServices services = client.services(); - services.deployClusterSingleton("s1", new TestServiceImpl()); + services.deployClusterSingleton("testReconnect", new TestServiceImpl()); - TestService srvc = services.serviceProxy("s1", TestService.class, false); + TestService srvc = services.serviceProxy("testReconnect", TestService.class, false); assertNotNull(srvc); @@ -63,37 +60,159 @@ public class IgniteClientReconnectServicesTest extends IgniteClientReconnectAbst Ignite srv = clientRouter(client); - TestTcpDiscoverySpi srvSpi = spi(srv); + reconnectClientNode(client, srv, null); + + CountDownLatch latch = new CountDownLatch(1); + + DummyService.exeLatch("testReconnect2", latch); + + services.deployClusterSingleton("testReconnect2", new DummyService()); + + assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); + + assertEquals((Object) 4L, srvc.test()); + } + + /** + * @throws Exception If failed. + */ + public void testServiceRemove() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); - final CountDownLatch reconnectLatch = new CountDownLatch(1); + IgniteServices clnServices = client.services(); - client.events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event evt) { - if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) - info("Disconnected: " + evt); - else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { - info("Reconnected: " + evt); + final IgniteServices srvServices = srv.services(); - reconnectLatch.countDown(); + srvServices.deployClusterSingleton("testServiceRemove", new TestServiceImpl()); + + final TestService srvc = clnServices.serviceProxy("testServiceRemove", TestService.class, false); + + assertNotNull(srvc); + + assertNotNull(srvc.test()); + + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + srvServices.cancel("testServiceRemove"); + } + }); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return srvc.test(); + } + }, IgniteException.class, null); + + clnServices.deployClusterSingleton("testServiceRemove", new TestServiceImpl()); + + TestService newSrvc = clnServices.serviceProxy("testServiceRemove", TestService.class, false); + + assertNotNull(newSrvc); + assertNotNull(newSrvc.test()); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectInDeploying() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + final IgniteServices services = client.services(); + + Ignite srv = clientRouter(client); + + BlockTpcCommunicationSpi commSpi = commSpi(srv); + + commSpi.blockMsg(GridContinuousMessage.class); + + final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + try { + services.deployClusterSingleton("testReconnectInDeploying", new TestServiceImpl()); + } + catch (Exception e) { + checkAndWait(e); + + return true; } - return true; + return false; } - }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + }); - srvSpi.failNode(client.cluster().localNode().id(), null); + // Check that client waiting operation. + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return fut.get(200); + } + }, IgniteFutureTimeoutCheckedException.class, null); - assertTrue(reconnectLatch.await(5000, MILLISECONDS)); + assertNotDone(fut); - CountDownLatch latch = new CountDownLatch(1); + commSpi.unblockMsg(); - DummyService.exeLatch("s2", latch); + reconnectClientNode(client, srv, null); - services.deployClusterSingleton("s2", new DummyService()); + assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS)); + } - assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); + /** + * @throws Exception If failed. + */ + public void testReconnectInProgress() throws Exception { + Ignite client = grid(serverCount()); - assertEquals((Object) 4L, srvc.test()); + assertTrue(client.cluster().localNode().isClient()); + + final IgniteServices services = client.services(); + + final Ignite srv = clientRouter(client); + + services.deployClusterSingleton("testReconnectInProgress", new TestServiceImpl()); + + final TestService srvc = services.serviceProxy("testReconnectInProgress", TestService.class, false); + + assertNotNull(srvc); + + BlockTpcCommunicationSpi commSpi = commSpi(srv); + + commSpi.blockMsg(GridJobExecuteResponse.class); + + final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + try { + srvc.test(); + } + catch (Exception e) { + checkAndWait(e); + + return true; + } + + return false; + } + }); + + // Check that client waiting operation. + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return fut.get(200); + } + }, IgniteFutureTimeoutCheckedException.class, null); + + assertNotDone(fut); + + commSpi.unblockMsg(); + + reconnectClientNode(client, srv, null); + + assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS)); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/92908b91/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java index 1fa5e73..8112ce5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java @@ -17,12 +17,147 @@ package org.apache.ignite.internal; +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.processors.datastreamer.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.testframework.*; + +import java.util.concurrent.*; + /** - * TODO IGNITE-901. + * */ public class IgniteClientReconnectStreamerTest extends IgniteClientReconnectAbstractTest { + /** */ + public static final String CACHE_NAME = "streamer"; + /** {@inheritDoc} */ @Override protected int serverCount() { return 1; } + + /** {@inheritDoc} */ + @Override protected int clientCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<Integer, Integer>(CACHE_NAME) + .setAtomicityMode(CacheAtomicityMode.ATOMIC) + .setCacheMode(CacheMode.PARTITIONED); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testStreamerReconnect() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + final IgniteCache<Object, Object> srvCache = srv.cache(CACHE_NAME); + + IgniteDataStreamer<Integer, Integer> streamer = client.dataStreamer("streamer"); + + for (int i = 0; i < 50; i++) + streamer.addData(i, i); + + streamer.flush(); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return srvCache.localSize() == 50; + } + }, 2000L); + reconnectClientNode(client, srv, null); + + for (int i = 0; i < 50; i++) + streamer.addData(i, i); + + streamer.flush(); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return srvCache.localSize() == 100; + } + }, 2000L); + + streamer.close(); + + streamer.future().get(2, TimeUnit.SECONDS); + + srvCache.removeAll(); + } + + /** + * @throws Exception If failed. + */ + public void testStreamerReconnectInProgress() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + final IgniteCache<Object, Object> srvCache = srv.cache(CACHE_NAME); + + final IgniteDataStreamer<Integer, Integer> streamer = client.dataStreamer("streamer"); + + BlockTpcCommunicationSpi commSpi = commSpi(srv); + + commSpi.blockMsg(DataStreamerResponse.class); + + final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + try { + for (int i = 0; i < 50; i++) + streamer.addData(i, i); + + streamer.flush(); + } + catch (Exception e) { + checkAndWait(e); + + return true; + } + + return false; + } + }); + + // Check that client waiting operation. + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return fut.get(200); + } + }, IgniteFutureTimeoutCheckedException.class, null); + + assertNotDone(fut); + + commSpi.unblockMsg(); + + reconnectClientNode(client, srv, null); + + assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS)); + + for (int i = 0; i < 50; i++) + streamer.addData(i, i); + + streamer.flush(); + + assertTrue(srv.cache(CACHE_NAME).localSize() >= 0); + + srvCache.removeAll(); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/92908b91/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java index 17e33c7..fb41f0f 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java @@ -41,7 +41,6 @@ public class IgniteClientReconnectTestSuite extends TestSuite { suite.addTestSuite(IgniteClientReconnectAtomicsTest.class); suite.addTestSuite(IgniteClientReconnectCollectionsTest.class); suite.addTestSuite(IgniteClientReconnectServicesTest.class); - suite.addTestSuite(IgniteClientReconnectQueriesTest.class); suite.addTestSuite(IgniteClientReconnectStreamerTest.class); return suite; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/92908b91/modules/indexing/src/test/java/org/apache/ignite/internal/processors/IgniteClientReconnectQueriesTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/IgniteClientReconnectQueriesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/IgniteClientReconnectQueriesTest.java new file mode 100644 index 0000000..f75e780 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/IgniteClientReconnectQueriesTest.java @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.cache.query.annotations.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.query.*; +import org.apache.ignite.internal.processors.query.h2.twostep.messages.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.testframework.*; + +import javax.cache.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * + */ +public class IgniteClientReconnectQueriesTest extends IgniteClientReconnectAbstractTest { + /** */ + public static final String QUERY_CACHE = "query"; + + /** {@inheritDoc} */ + @Override protected int serverCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected int clientCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration<Integer, Person> ccfg = new CacheConfiguration<Integer, Person>(QUERY_CACHE) + .setCacheMode(CacheMode.PARTITIONED) + .setAtomicityMode(CacheAtomicityMode.ATOMIC) + .setIndexedTypes(Integer.class, Person.class); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testQueryReconnect() throws Exception { + Ignite cln = grid(serverCount()); + + assertTrue(cln.cluster().localNode().isClient()); + + final Ignite srv = clientRouter(cln); + + final IgniteCache<Integer, Person> clnCache = cln.getOrCreateCache(QUERY_CACHE); + + final IgniteCache<Integer, Person> srvCache = srv.getOrCreateCache(QUERY_CACHE); + + clnCache.removeAll(); + + clnCache.put(1, new Person(1, "name1", "surname1")); + clnCache.put(2, new Person(2, "name2", "surname2")); + clnCache.put(3, new Person(3, "name3", "surname3")); + + SqlQuery<Integer, Person> qry = new SqlQuery<>(Person.class, "_key<>0"); + qry.setPageSize(1); + + QueryCursor<Cache.Entry<Integer, Person>> queryCursor = clnCache.query(qry); + + reconnectClientNode(cln, srv, new Runnable() { + @Override public void run() { + srvCache.put(4, new Person(4, "name4", "surname4")); + } + }); + + List<Cache.Entry<Integer, Person>> result = queryCursor.getAll(); + + assertNotNull(result); + assertEquals(4, result.size()); + } + + /** + * @throws Exception If failed. + */ + public void testQueryReconnectInProgress() throws Exception { + Ignite cln = grid(serverCount()); + + assertTrue(cln.cluster().localNode().isClient()); + + final Ignite srv = clientRouter(cln); + + final IgniteCache<Integer, Person> clnCache = cln.getOrCreateCache(QUERY_CACHE); + + clnCache.removeAll(); + + clnCache.put(1, new Person(1, "name1", "surname1")); + clnCache.put(2, new Person(2, "name2", "surname2")); + clnCache.put(3, new Person(3, "name3", "surname3")); + + final SqlQuery<Integer, Person> qry = new SqlQuery<>(Person.class, "_key<>0"); + qry.setPageSize(1); + + final QueryCursor<Cache.Entry<Integer, Person>> queryCursor = clnCache.query(qry); + + BlockTpcCommunicationSpi commSpi = commSpi(srv); + + commSpi.blockMsg(GridQueryNextPageResponse.class); + + final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + try { + queryCursor.getAll(); + } + catch (Exception e) { + checkAndWait(e); + + return true; + } + + return false; + } + }); + + // Check that client waiting operation. + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return fut.get(200); + } + }, IgniteFutureTimeoutCheckedException.class, null); + + assertNotDone(fut); + + commSpi.unblockMsg(); + + reconnectClientNode(cln, srv, null); + + assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS)); + } + /** + * @throws Exception If failed. + */ + public void testScanQueryReconnect() throws Exception { + Ignite cln = grid(serverCount()); + + assertTrue(cln.cluster().localNode().isClient()); + + final Ignite srv = clientRouter(cln); + + final IgniteCache<Integer, Person> clnCache = cln.getOrCreateCache(QUERY_CACHE); + + final IgniteCache<Integer, Person> srvCache = srv.getOrCreateCache(QUERY_CACHE); + + clnCache.removeAll(); + + clnCache.put(1, new Person(1, "name1", "surname1")); + clnCache.put(2, new Person(2, "name2", "surname2")); + clnCache.put(3, new Person(3, "name3", "surname3")); + + ScanQuery<Integer, Person> scanQry = new ScanQuery<>(); + + scanQry.setPageSize(1); + scanQry.setFilter(new IgniteBiPredicate<Integer, Person>() { + @Override public boolean apply(Integer integer, Person person) { + return true; + } + }); + + final QueryCursor<Cache.Entry<Integer, Person>> queryCursor = clnCache.query(scanQry); + + reconnectClientNode(cln, srv, new Runnable() { + @Override public void run() { + srvCache.put(4, new Person(4, "name4", "surname4")); + } + }); + + IgniteInternalFuture<List<Cache.Entry<Integer, Person>>> f = GridTestUtils + .runAsync(new Callable<List<Cache.Entry<Integer, Person>>>() { + @Override public List<Cache.Entry<Integer, Person>> call() throws Exception { + return queryCursor.getAll(); + } + }); + + List<Cache.Entry<Integer, Person>> result = f.get(2, TimeUnit.SECONDS); + + assertEquals(4, result.size()); + } + + /** + * @throws Exception If failed. + */ + public void testScanQueryReconnectInProgress() throws Exception { + Ignite cln = grid(serverCount()); + + assertTrue(cln.cluster().localNode().isClient()); + + final Ignite srv = clientRouter(cln); + + final IgniteCache<Integer, Person> clnCache = cln.getOrCreateCache(QUERY_CACHE); + + clnCache.put(1, new Person(1, "name1", "surname1")); + clnCache.put(2, new Person(2, "name2", "surname2")); + clnCache.put(3, new Person(3, "name3", "surname3")); + + ScanQuery<Integer, Person> scanQry = new ScanQuery<>(); + + scanQry.setPageSize(1); + scanQry.setFilter(new IgniteBiPredicate<Integer, Person>() { + @Override public boolean apply(Integer integer, Person person) { + return true; + } + }); + + final QueryCursor<Cache.Entry<Integer, Person>> queryCursor = clnCache.query(scanQry); + + BlockTpcCommunicationSpi commSpi = commSpi(srv); + + commSpi.blockMsg(GridCacheQueryResponse.class); + + final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + try { + queryCursor.getAll(); + } + catch (Exception e) { + checkAndWait(e); + + return true; + } + + return false; + } + }); + + // Check that client waiting operation. + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return fut.get(200); + } + }, IgniteFutureTimeoutCheckedException.class, null); + + assertNotDone(fut); + + commSpi.unblockMsg(); + + reconnectClientNode(cln, srv, null); + + assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS)); + } + + /** + * + */ + public static class Person { + /** */ + @QuerySqlField + public int id; + + /** */ + @QuerySqlField + public String name; + + /** */ + @QuerySqlField + public String surname; + + /** + * @param id Id. + * @param name Name. + * @param surname Surname. + */ + public Person(int id, String name, String surname) { + this.id = id; + this.name = name; + this.surname = surname; + } + + /** + * @return Id. + */ + public int getId() { + return id; + } + + /** + * @param id Set id. + */ + public void setId(int id) { + this.id = id; + } + + /** + * @return Name. + */ + public String getName() { + return name; + } + + /** + * @param name Name. + */ + public void setName(String name) { + this.name = name; + } + + /** + * @return Surname. + */ + public String getSurname() { + return surname; + } + + /** + * @param surname Surname. + */ + public void setSurname(String surname) { + this.surname = surname; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + return this == o || !(o == null || getClass() != o.getClass()) && id == ((Person)o).id; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return id; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Person.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/92908b91/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index 181ff0c..9f0f850 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -18,6 +18,7 @@ package org.apache.ignite.testsuites; import junit.framework.*; +import org.apache.ignite.internal.processors.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.processors.cache.distributed.replicated.*; @@ -122,6 +123,9 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { //Unmarshallig query test. suite.addTestSuite(IgniteCacheP2pUnmarshallingQueryErrorTest.class); + // Reconnect client query test. + suite.addTestSuite(IgniteClientReconnectQueriesTest.class); + return suite; } }
