Repository: ignite Updated Branches: refs/heads/master a1897dfd1 -> 2b23d46fe
IGNITE-9493 Do not call communication error resolver in case of client node failed Signed-off-by: Pavel Kovalenko <jokse...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2b23d46f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2b23d46f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2b23d46f Branch: refs/heads/master Commit: 2b23d46feba75b1eb51c8373c730b20504d1b30a Parents: a1897df Author: zstan <stanilov...@gmail.com> Authored: Tue Dec 25 12:01:34 2018 +0300 Committer: Pavel Kovalenko <jokse...@gmail.com> Committed: Tue Dec 25 12:01:34 2018 +0300 ---------------------------------------------------------------------- .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 9 +- .../zk/ZookeeperDiscoverySpiMBean.java | 8 ++ .../zk/internal/ZookeeperDiscoveryImpl.java | 5 + .../internal/ZookeeperDiscoveryStatistics.java | 21 ++++- .../zk/internal/ZookeeperDiscoverySpiTest.java | 96 +++++++++++++++++++- 5 files changed, 132 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2b23d46f/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java index 222b73b..287bd45 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java @@ -595,12 +595,17 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements IgniteDis } /** {@inheritDoc} */ - @Nullable @Override public UUID getCoordinator() { + @Override public long getCommErrorProcNum() { + return stats.commErrorCount(); + } + + /** {@inheritDoc} */ + @Override public @Nullable UUID getCoordinator() { return impl.getCoordinator(); } /** {@inheritDoc} */ - @Nullable @Override public String getCoordinatorNodeFormatted() { + @Override public @Nullable String getCoordinatorNodeFormatted() { return String.valueOf(impl.node(impl.getCoordinator())); } http://git-wip-us.apache.org/repos/asf/ignite/blob/2b23d46f/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiMBean.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiMBean.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiMBean.java index 1eed0b4..05a3dc2 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiMBean.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiMBean.java @@ -58,6 +58,14 @@ public interface ZookeeperDiscoverySpiMBean extends IgniteSpiManagementMBean, Di public String getZkSessionId(); /** + * Gets number of communication resolver called. + * + * @return Number of communication resolved oparations. + */ + @MXBeanDescription("Communication error resolver call count.") + public long getCommErrorProcNum(); + + /** * Gets root path in ZooKeeper cluster Zk client uses to put data to. * * @return Zk Root Path. http://git-wip-us.apache.org/repos/asf/ignite/blob/2b23d46f/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index fa218ff..d57c8d6 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -300,6 +300,9 @@ public class ZookeeperDiscoveryImpl { * @param err Connect error. */ public void resolveCommunicationError(ClusterNode node0, Exception err) { + if (node0.isClient()) + return; + ZookeeperClusterNode node = node(node0.id()); if (node == null) @@ -317,6 +320,8 @@ public class ZookeeperDiscoveryImpl { this, node.sessionTimeout() + 1000); + stats.onCommunicationError(); + if (commErrProcFut.compareAndSet(fut, newFut)) { fut = newFut; http://git-wip-us.apache.org/repos/asf/ignite/blob/2b23d46f/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryStatistics.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryStatistics.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryStatistics.java index 678cf11..cc95dd3 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryStatistics.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryStatistics.java @@ -23,22 +23,30 @@ import org.apache.ignite.internal.util.typedef.internal.S; */ public class ZookeeperDiscoveryStatistics { /** */ - private int joinedNodesCnt; + private long joinedNodesCnt; /** */ - private int failedNodesCnt; + private long failedNodesCnt; + + /** Communication error count. */ + private long commErrCnt; /** */ - public int joinedNodesCnt() { + public long joinedNodesCnt() { return joinedNodesCnt; } /** */ - public int failedNodesCnt() { + public long failedNodesCnt() { return failedNodesCnt; } /** */ + public long commErrorCount() { + return commErrCnt; + } + + /** */ public void onNodeJoined() { joinedNodesCnt++; } @@ -48,6 +56,11 @@ public class ZookeeperDiscoveryStatistics { failedNodesCnt++; } + /** */ + public void onCommunicationError() { + commErrCnt++; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(ZookeeperDiscoveryStatistics.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/2b23d46f/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java index 808f1ee..f9a6fa4 100644 --- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java @@ -206,6 +206,9 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { private boolean failCommSpi; /** */ + private boolean blockCommSpi; + + /** */ private long sesTimeout; /** */ @@ -401,6 +404,13 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { if (failCommSpi) cfg.setCommunicationSpi(new PeerToPeerCommunicationFailureSpi()); + if (blockCommSpi) { + cfg.setCommunicationSpi(new TcpBlockCommunicationSpi(igniteInstanceName.contains("block")) + .setUsePairedConnections(true)); + + cfg.setNetworkTimeout(500); + } + if (commFailureRslvr != null) cfg.setCommunicationFailureResolver(commFailureRslvr.apply()); @@ -3586,6 +3596,45 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { } /** + * Test reproduces failure in case of client resolution failure + * {@link org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi#createTcpClient} from server side, further + * client reconnect and proper grid work. + * + * @throws Exception If failed. + */ + @Test + public void testClientReconnects() throws Exception { + blockCommSpi = true; + + Ignite srv1 = startGrid("server1-block"); + + clientModeThreadLocal(true); + + IgniteEx cli = startGrid("client-block"); + + IgniteCache<Object, Object> cache = cli.getOrCreateCache(DEFAULT_CACHE_NAME); + + cache.put(1, 1); + + assertEquals(cache.get(1), 1); + + assertEquals(1, srv1.cluster().forClients().nodes().size()); + + MBeanServer srv = ManagementFactory.getPlatformMBeanServer(); + + IgniteEx ignite = grid("server1-block"); + + ObjectName spiName = U.makeMBeanName(ignite.context().igniteInstanceName(), "SPIs", + ZookeeperDiscoverySpi.class.getSimpleName()); + + ZookeeperDiscoverySpiMBean bean = JMX.newMBeanProxy(srv, spiName, ZookeeperDiscoverySpiMBean.class); + + assertNotNull(bean); + + assertEquals(0, bean.getCommErrorProcNum()); + } + + /** * @throws Exception If failed. */ @Test @@ -5613,7 +5662,7 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Nullable @Override public DiscoveryCustomMessage ackMessage() { + @Override public @Nullable DiscoveryCustomMessage ackMessage() { return null; } @@ -5656,4 +5705,49 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { return S.toString(TestFastStopProcessCustomMessageAck.class, this); } } + + /** + * Block communications. + */ + private static class TcpBlockCommunicationSpi extends TcpCommunicationSpi { + /** + * Whether this instance should actually block. + */ + private final boolean isBlocking; + + /** Blocked once. */ + private boolean alreadyBlocked; + + /** + * @param isBlocking Whether this instance should actually block. + */ + public TcpBlockCommunicationSpi(boolean isBlocking) { + this.isBlocking = isBlocking; + } + + /** {@inheritDoc} */ + @Override protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) + throws IgniteCheckedException { + if (node.isClient() && blockHandshakeOnce(node.id())) { + ZookeeperDiscoverySpi spi = spi(ignite()); + + spi.resolveCommunicationFailure(spi.getRemoteNodes().iterator().next(), new Exception("test")); + + return null; + } + + return super.createTcpClient(node, connIdx); + } + + /** Check if this connection is blocked. */ + private boolean blockHandshakeOnce(UUID nodeId) { + if (isBlocking && !alreadyBlocked) { + alreadyBlocked = true; + + return true; + } + + return false; + } + } }