ignite-1758
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e28b876e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e28b876e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e28b876e Branch: refs/heads/ignite-1758 Commit: e28b876ee2ff3794d41e2ff4be9a3f2c0a7e0453 Parents: cac075c Author: sboikov <[email protected]> Authored: Wed Oct 28 17:12:12 2015 +0300 Committer: sboikov <[email protected]> Committed: Wed Oct 28 17:12:12 2015 +0300 ---------------------------------------------------------------------- .../communication/tcp/TcpCommunicationSpi.java | 6 +- .../ignite/spi/discovery/tcp/ClientImpl.java | 1 - .../spi/discovery/tcp/TcpDiscoverySpi.java | 2 +- ...gniteClientReconnectMassiveShutdownTest.java | 304 ------------------- ...gniteClientReconnectMassiveShutdownTest.java | 304 +++++++++++++++++++ .../tcp/TcpDiscoveryMultiThreadedTest.java | 54 +++- .../testsuites/IgniteClientNodesTestSuite.java | 8 +- 7 files changed, 366 insertions(+), 313 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e28b876e/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 99c6145..e8bd8a1 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -3114,10 +3114,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter for (ClientKey id : left) { GridNioRecoveryDescriptor recoverySnd = recoveryDescs.get(id); - if (recoverySnd != null) { - if (recoverySnd.onNodeLeft()) - recoveryDescs.remove(id); - } + if (recoverySnd != null && recoverySnd.onNodeLeft()) + recoveryDescs.remove(id); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e28b876e/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 9cadca1..1cc93aa 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -98,7 +98,6 @@ import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_HISTORY_SIZE; import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED; import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; http://git-wip-us.apache.org/repos/asf/ignite/blob/e28b876e/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 7383cd5..6254605 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -1956,7 +1956,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T * <p> * This method is intended for test purposes only. */ - public void simulateNodeFailure() { + void simulateNodeFailure() { impl.simulateNodeFailure(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e28b876e/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectMassiveShutdownTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectMassiveShutdownTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectMassiveShutdownTest.java deleted file mode 100644 index 54dd73a..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectMassiveShutdownTest.java +++ /dev/null @@ -1,304 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal; - -import java.util.HashMap; -import java.util.Random; -import java.util.UUID; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import javax.cache.CacheException; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCache; -import org.apache.ignite.IgniteClientDisconnectedException; -import org.apache.ignite.IgniteException; -import org.apache.ignite.IgniteTransactions; -import org.apache.ignite.cluster.ClusterTopologyException; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.util.typedef.X; -import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.apache.ignite.transactions.Transaction; - -import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; -import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED; -import static org.apache.ignite.cache.CacheMode.PARTITIONED; -import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; -import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; - -/** - * Client reconnect test in multi threaded mode while cache operations are in progress. - */ -public class IgniteClientReconnectMassiveShutdownTest extends GridCommonAbstractTest { - /** */ - private static final int GRID_CNT = 14; - - /** */ - private static final int CLIENT_GRID_CNT = 14; - - /** */ - private static volatile boolean clientMode; - - /** */ - private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - cfg.setClientMode(clientMode); - - cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipFinder)); - - ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(); - - super.afterTest(); - - Thread.sleep(5000); - } - - /** {@inheritDoc} */ - @Override protected long getTestTimeout() { - return 5 * 60 * 1000; - } - - /** - * @throws Exception If any error occurs. - */ - public void _testMassiveServersShutdown1() throws Exception { - massiveServersShutdown(StopType.FAIL_EVENT); - } - - /** - * @throws Exception If any error occurs. - */ - public void testMassiveServersShutdown2() throws Exception { - massiveServersShutdown(StopType.SIMULATE_FAIL); - } - - /** - * @throws Exception If any error occurs. - */ - public void testMassiveServersShutdown3() throws Exception { - massiveServersShutdown(StopType.CLOSE); - } - - /** - * @param stopType How tp stop node. - * @throws Exception If any error occurs. - */ - private void massiveServersShutdown(final StopType stopType) throws Exception { - clientMode = false; - - startGridsMultiThreaded(GRID_CNT); - - clientMode = true; - - startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT); - - final AtomicBoolean done = new AtomicBoolean(); - - // Starting a cache dynamically. - Ignite client = grid(GRID_CNT); - - assertTrue(client.configuration().isClientMode()); - - CacheConfiguration<String, Integer> cfg = new CacheConfiguration<>(); - - cfg.setCacheMode(PARTITIONED); - cfg.setAtomicityMode(TRANSACTIONAL); - cfg.setBackups(2); - cfg.setOffHeapMaxMemory(0); - cfg.setMemoryMode(OFFHEAP_TIERED); - - IgniteCache<String, Integer> cache = client.getOrCreateCache(cfg); - - HashMap<String, Integer> put = new HashMap<>(); - - // Load some data. - for (int i = 0; i < 10_000; i++) - put.put(String.valueOf(i), i); - - cache.putAll(put); - - // Preparing client nodes and starting cache operations from them. - final BlockingQueue<Integer> clientIdx = new LinkedBlockingQueue<>(); - - for (int i = GRID_CNT; i < GRID_CNT + CLIENT_GRID_CNT; i++) - clientIdx.add(i); - - IgniteInternalFuture<?> clientsFut = multithreadedAsync( - new Callable<Object>() { - @Override public Object call() throws Exception { - int idx = clientIdx.take(); - - Ignite ignite = grid(idx); - - Thread.currentThread().setName("client-thread-" + ignite.name()); - - assertTrue(ignite.configuration().isClientMode()); - - IgniteCache<String, Integer> cache = ignite.cache(null); - - IgniteTransactions txs = ignite.transactions(); - - Random rand = new Random(); - - while (!done.get()) { - try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { - cache.put(String.valueOf(rand.nextInt(10_000)), rand.nextInt(50_000)); - - tx.commit(); - } - catch (ClusterTopologyException ex) { - ex.retryReadyFuture().get(); - } - catch (IgniteException | CacheException e) { - if (X.hasCause(e, IgniteClientDisconnectedException.class)) { - IgniteClientDisconnectedException cause = X.cause(e, - IgniteClientDisconnectedException.class); - - assert cause != null; - - cause.reconnectFuture().get(); - } - else if (X.hasCause(e, ClusterTopologyException.class)) { - ClusterTopologyException cause = X.cause(e, ClusterTopologyException.class); - - assert cause != null; - - cause.retryReadyFuture().get(); - } - else - throw e; - } - } - - return null; - } - }, - CLIENT_GRID_CNT); - - try { - // Killing a half of server nodes. - final int srvsToKill = GRID_CNT / 2; - - final BlockingQueue<Integer> victims = new LinkedBlockingQueue<>(); - - for (int i = 0; i < srvsToKill; i++) - victims.add(i); - - final BlockingQueue<Integer> assassins = new LinkedBlockingQueue<>(); - - for (int i = srvsToKill; i < GRID_CNT; i++) - assassins.add(i); - - IgniteInternalFuture<?> srvsShutdownFut = multithreadedAsync( - new Callable<Object>() { - @Override public Object call() throws Exception { - Thread.sleep(5_000); - - Ignite assassin = grid(assassins.take()); - - assertFalse(assassin.configuration().isClientMode()); - - Ignite victim = grid(victims.take()); - - assertFalse(victim.configuration().isClientMode()); - - log.info("Kill node [node=" + victim.name() + ", from=" + assassin.name() + ']'); - - switch (stopType) { - case CLOSE: - victim.close(); - - break; - - case FAIL_EVENT: - UUID nodeId = victim.cluster().localNode().id(); - - assassin.configuration().getDiscoverySpi().failNode(nodeId, null); - - break; - - case SIMULATE_FAIL: - ((TcpDiscoverySpi)victim.configuration().getDiscoverySpi()).simulateNodeFailure(); - - break; - - default: - fail(); - } - - return null; - } - }, - assassins.size() - ); - - srvsShutdownFut.get(); - - Thread.sleep(15_000); - - done.set(true); - - clientsFut.get(); - - awaitPartitionMapExchange(); - - for (int k = 0; k < 10_000; k++) { - String key = String.valueOf(k); - - Object val = cache.get(key); - - for (int i = srvsToKill; i < GRID_CNT; i++) - assertEquals(val, ignite(i).cache(null).get(key)); - } - } - finally { - done.set(true); - } - } - - /** - * - */ - enum StopType { - /** */ - CLOSE, - - /** */ - SIMULATE_FAIL, - - /** */ - FAIL_EVENT - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/e28b876e/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientReconnectMassiveShutdownTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientReconnectMassiveShutdownTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientReconnectMassiveShutdownTest.java new file mode 100644 index 0000000..6fc29d0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientReconnectMassiveShutdownTest.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import java.util.HashMap; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.cache.CacheException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteClientDisconnectedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteTransactions; +import org.apache.ignite.cluster.ClusterTopologyException; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * Client reconnect test in multi threaded mode while cache operations are in progress. + */ +public class IgniteClientReconnectMassiveShutdownTest extends GridCommonAbstractTest { + /** */ + private static final int GRID_CNT = 14; + + /** */ + private static final int CLIENT_GRID_CNT = 14; + + /** */ + private static volatile boolean clientMode; + + /** */ + private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setClientMode(clientMode); + + cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipFinder)); + + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + + Thread.sleep(5000); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 5 * 60 * 1000; + } + + /** + * @throws Exception If any error occurs. + */ + public void _testMassiveServersShutdown1() throws Exception { + massiveServersShutdown(StopType.FAIL_EVENT); + } + + /** + * @throws Exception If any error occurs. + */ + public void testMassiveServersShutdown2() throws Exception { + massiveServersShutdown(StopType.SIMULATE_FAIL); + } + + /** + * @throws Exception If any error occurs. + */ + public void _testMassiveServersShutdown3() throws Exception { + massiveServersShutdown(StopType.CLOSE); + } + + /** + * @param stopType How tp stop node. + * @throws Exception If any error occurs. + */ + private void massiveServersShutdown(final StopType stopType) throws Exception { + clientMode = false; + + startGridsMultiThreaded(GRID_CNT); + + clientMode = true; + + startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT); + + final AtomicBoolean done = new AtomicBoolean(); + + // Starting a cache dynamically. + Ignite client = grid(GRID_CNT); + + assertTrue(client.configuration().isClientMode()); + + CacheConfiguration<String, Integer> cfg = new CacheConfiguration<>(); + + cfg.setCacheMode(PARTITIONED); + cfg.setAtomicityMode(TRANSACTIONAL); + cfg.setBackups(2); + cfg.setOffHeapMaxMemory(0); + cfg.setMemoryMode(OFFHEAP_TIERED); + + IgniteCache<String, Integer> cache = client.getOrCreateCache(cfg); + + HashMap<String, Integer> put = new HashMap<>(); + + // Load some data. + for (int i = 0; i < 10_000; i++) + put.put(String.valueOf(i), i); + + cache.putAll(put); + + // Preparing client nodes and starting cache operations from them. + final BlockingQueue<Integer> clientIdx = new LinkedBlockingQueue<>(); + + for (int i = GRID_CNT; i < GRID_CNT + CLIENT_GRID_CNT; i++) + clientIdx.add(i); + + IgniteInternalFuture<?> clientsFut = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + int idx = clientIdx.take(); + + Ignite ignite = grid(idx); + + Thread.currentThread().setName("client-thread-" + ignite.name()); + + assertTrue(ignite.configuration().isClientMode()); + + IgniteCache<String, Integer> cache = ignite.cache(null); + + IgniteTransactions txs = ignite.transactions(); + + Random rand = new Random(); + + while (!done.get()) { + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(String.valueOf(rand.nextInt(10_000)), rand.nextInt(50_000)); + + tx.commit(); + } + catch (ClusterTopologyException ex) { + ex.retryReadyFuture().get(); + } + catch (IgniteException | CacheException e) { + if (X.hasCause(e, IgniteClientDisconnectedException.class)) { + IgniteClientDisconnectedException cause = X.cause(e, + IgniteClientDisconnectedException.class); + + assert cause != null; + + cause.reconnectFuture().get(); + } + else if (X.hasCause(e, ClusterTopologyException.class)) { + ClusterTopologyException cause = X.cause(e, ClusterTopologyException.class); + + assert cause != null; + + cause.retryReadyFuture().get(); + } + else + throw e; + } + } + + return null; + } + }, + CLIENT_GRID_CNT); + + try { + // Killing a half of server nodes. + final int srvsToKill = GRID_CNT / 2; + + final BlockingQueue<Integer> victims = new LinkedBlockingQueue<>(); + + for (int i = 0; i < srvsToKill; i++) + victims.add(i); + + final BlockingQueue<Integer> assassins = new LinkedBlockingQueue<>(); + + for (int i = srvsToKill; i < GRID_CNT; i++) + assassins.add(i); + + IgniteInternalFuture<?> srvsShutdownFut = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + Thread.sleep(5_000); + + Ignite assassin = grid(assassins.take()); + + assertFalse(assassin.configuration().isClientMode()); + + Ignite victim = grid(victims.take()); + + assertFalse(victim.configuration().isClientMode()); + + log.info("Kill node [node=" + victim.name() + ", from=" + assassin.name() + ']'); + + switch (stopType) { + case CLOSE: + victim.close(); + + break; + + case FAIL_EVENT: + UUID nodeId = victim.cluster().localNode().id(); + + assassin.configuration().getDiscoverySpi().failNode(nodeId, null); + + break; + + case SIMULATE_FAIL: + ((TcpDiscoverySpi)victim.configuration().getDiscoverySpi()).simulateNodeFailure(); + + break; + + default: + fail(); + } + + return null; + } + }, + assassins.size() + ); + + srvsShutdownFut.get(); + + Thread.sleep(15_000); + + done.set(true); + + clientsFut.get(); + + awaitPartitionMapExchange(); + + for (int k = 0; k < 10_000; k++) { + String key = String.valueOf(k); + + Object val = cache.get(key); + + for (int i = srvsToKill; i < GRID_CNT; i++) + assertEquals(val, ignite(i).cache(null).get(key)); + } + } + finally { + done.set(true); + } + } + + /** + * + */ + enum StopType { + /** */ + CLOSE, + + /** */ + SIMULATE_FAIL, + + /** */ + FAIL_EVENT + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e28b876e/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java index fcb0116..0c38ee3 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java @@ -17,6 +17,9 @@ package org.apache.ignite.spi.discovery.tcp; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CyclicBarrier; @@ -28,18 +31,25 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.client.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import static org.apache.ignite.events.EventType.EVT_JOB_MAPPED; +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.events.EventType.EVT_TASK_FAILED; import static org.apache.ignite.events.EventType.EVT_TASK_FINISHED; @@ -57,8 +67,14 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { private static final ThreadLocal<Boolean> clientFlagPerThread = new ThreadLocal<>(); /** */ + private static final ThreadLocal<UUID> nodeId = new ThreadLocal<>(); + + /** */ private static volatile boolean clientFlagGlobal; + /** */ + private static GridConcurrentHashSet<UUID> failedNodes = new GridConcurrentHashSet<>(); + /** * @return Client node flag. */ @@ -83,6 +99,14 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); + UUID id = nodeId.get(); + + if (id != null) { + cfg.setNodeId(id); + + nodeId.set(null); + } + if (client()) cfg.setClientMode(true); @@ -91,6 +115,22 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { setJoinTimeout(60_000). setNetworkTimeout(60_000)); + int[] evts = {EVT_NODE_FAILED, EVT_NODE_LEFT}; + + Map<IgnitePredicate<? extends Event>, int[]> lsnrs = new HashMap<>(); + + lsnrs.put(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + DiscoveryEvent discoveryEvt = (DiscoveryEvent)evt; + + failedNodes.add(discoveryEvt.eventNode().id()); + + return true; + } + }, evts); + + cfg.setLocalEventListeners(lsnrs); + cfg.setCacheConfiguration(); cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); @@ -105,6 +145,8 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { stopAllGrids(); super.afterTest(); + + failedNodes.clear(); } /** {@inheritDoc} */ @@ -205,6 +247,10 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { log.info("Start client: " + startIdx); + UUID id = UUID.randomUUID(); + + nodeId.set(id); + try { Ignite ignite = startGrid(startIdx); @@ -218,8 +264,12 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { if (X.hasCause(e, IgniteClientDisconnectedCheckedException.class) || X.hasCause(e, IgniteClientDisconnectedException.class)) log.info("Client disconnected: " + e); - else - throw e; + else { + if (failedNodes.contains(id) && X.hasCause(e, IgniteSpiException.class)) + log.info("Client failed: " + e); + else + throw e; + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e28b876e/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientNodesTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientNodesTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientNodesTestSuite.java index 0055954..1558a7c 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientNodesTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientNodesTestSuite.java @@ -18,7 +18,7 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; -import org.apache.ignite.internal.IgniteClientReconnectMassiveShutdownTest; +import org.apache.ignite.spi.discovery.tcp.IgniteClientReconnectMassiveShutdownTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheClientNodeConcurrentStart; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheClientReconnectTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheManyClientsTest; @@ -34,6 +34,12 @@ public class IgniteClientNodesTestSuite extends TestSuite { public static TestSuite suite() throws Exception { TestSuite suite = new TestSuite("Ignite Client Nodes Reconnect Test Suite"); + suite.addTest(IgniteClientReconnectTestSuite.suite()); + + suite.addTestSuite(IgniteCacheManyClientsTest.class); + suite.addTestSuite(IgniteCacheClientNodeConcurrentStart.class); + suite.addTestSuite(IgniteCacheClientReconnectTest.class); + suite.addTestSuite(IgniteClientReconnectMassiveShutdownTest.class); return suite;
