Repository: ignite
Updated Branches:
  refs/heads/master a1897dfd1 -> 2b23d46fe


IGNITE-9493 Do not call communication error resolver in case of client node 
failed

Signed-off-by: Pavel Kovalenko <jokse...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2b23d46f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2b23d46f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2b23d46f

Branch: refs/heads/master
Commit: 2b23d46feba75b1eb51c8373c730b20504d1b30a
Parents: a1897df
Author: zstan <stanilov...@gmail.com>
Authored: Tue Dec 25 12:01:34 2018 +0300
Committer: Pavel Kovalenko <jokse...@gmail.com>
Committed: Tue Dec 25 12:01:34 2018 +0300

----------------------------------------------------------------------
 .../spi/discovery/zk/ZookeeperDiscoverySpi.java |  9 +-
 .../zk/ZookeeperDiscoverySpiMBean.java          |  8 ++
 .../zk/internal/ZookeeperDiscoveryImpl.java     |  5 +
 .../internal/ZookeeperDiscoveryStatistics.java  | 21 ++++-
 .../zk/internal/ZookeeperDiscoverySpiTest.java  | 96 +++++++++++++++++++-
 5 files changed, 132 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2b23d46f/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index 222b73b..287bd45 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -595,12 +595,17 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements IgniteDis
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public UUID getCoordinator() {
+        @Override public long getCommErrorProcNum() {
+            return stats.commErrorCount();
+        }
+
+        /** {@inheritDoc} */
+        @Override public @Nullable UUID getCoordinator() {
             return impl.getCoordinator();
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public String getCoordinatorNodeFormatted() {
+        @Override public @Nullable String getCoordinatorNodeFormatted() {
             return String.valueOf(impl.node(impl.getCoordinator()));
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2b23d46f/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiMBean.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiMBean.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiMBean.java
index 1eed0b4..05a3dc2 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiMBean.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiMBean.java
@@ -58,6 +58,14 @@ public interface ZookeeperDiscoverySpiMBean extends 
IgniteSpiManagementMBean, Di
     public String getZkSessionId();
 
     /**
+     * Gets number of communication resolver called.
+     *
+     * @return Number of communication resolved oparations.
+     */
+    @MXBeanDescription("Communication error resolver call count.")
+    public long getCommErrorProcNum();
+
+    /**
      * Gets root path in ZooKeeper cluster Zk client uses to put data to.
      *
      * @return Zk Root Path.

http://git-wip-us.apache.org/repos/asf/ignite/blob/2b23d46f/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index fa218ff..d57c8d6 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -300,6 +300,9 @@ public class ZookeeperDiscoveryImpl {
      * @param err Connect error.
      */
     public void resolveCommunicationError(ClusterNode node0, Exception err) {
+        if (node0.isClient())
+            return;
+
         ZookeeperClusterNode node = node(node0.id());
 
         if (node == null)
@@ -317,6 +320,8 @@ public class ZookeeperDiscoveryImpl {
                     this,
                     node.sessionTimeout() + 1000);
 
+                stats.onCommunicationError();
+
                 if (commErrProcFut.compareAndSet(fut, newFut)) {
                     fut = newFut;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2b23d46f/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryStatistics.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryStatistics.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryStatistics.java
index 678cf11..cc95dd3 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryStatistics.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryStatistics.java
@@ -23,22 +23,30 @@ import org.apache.ignite.internal.util.typedef.internal.S;
  */
 public class ZookeeperDiscoveryStatistics {
     /** */
-    private int joinedNodesCnt;
+    private long joinedNodesCnt;
 
     /** */
-    private int failedNodesCnt;
+    private long failedNodesCnt;
+
+    /** Communication error count. */
+    private long commErrCnt;
 
     /** */
-    public int joinedNodesCnt() {
+    public long joinedNodesCnt() {
         return joinedNodesCnt;
     }
 
     /** */
-    public int failedNodesCnt() {
+    public long failedNodesCnt() {
         return failedNodesCnt;
     }
 
     /** */
+    public long commErrorCount() {
+        return commErrCnt;
+    }
+
+    /** */
     public void onNodeJoined() {
         joinedNodesCnt++;
     }
@@ -48,6 +56,11 @@ public class ZookeeperDiscoveryStatistics {
         failedNodesCnt++;
     }
 
+    /** */
+    public void onCommunicationError() {
+        commErrCnt++;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(ZookeeperDiscoveryStatistics.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/2b23d46f/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
index 808f1ee..f9a6fa4 100644
--- 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
+++ 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
@@ -206,6 +206,9 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
     private boolean failCommSpi;
 
     /** */
+    private boolean blockCommSpi;
+
+    /** */
     private long sesTimeout;
 
     /** */
@@ -401,6 +404,13 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
         if (failCommSpi)
             cfg.setCommunicationSpi(new PeerToPeerCommunicationFailureSpi());
 
+        if (blockCommSpi) {
+            cfg.setCommunicationSpi(new 
TcpBlockCommunicationSpi(igniteInstanceName.contains("block"))
+                .setUsePairedConnections(true));
+
+            cfg.setNetworkTimeout(500);
+        }
+
         if (commFailureRslvr != null)
             cfg.setCommunicationFailureResolver(commFailureRslvr.apply());
 
@@ -3586,6 +3596,45 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * Test reproduces failure in case of client resolution failure
+     * {@link 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi#createTcpClient} 
from server side, further
+     * client reconnect and proper grid work.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testClientReconnects() throws Exception {
+        blockCommSpi = true;
+
+        Ignite srv1 = startGrid("server1-block");
+
+        clientModeThreadLocal(true);
+
+        IgniteEx cli = startGrid("client-block");
+
+        IgniteCache<Object, Object> cache = 
cli.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        cache.put(1, 1);
+
+        assertEquals(cache.get(1), 1);
+
+        assertEquals(1, srv1.cluster().forClients().nodes().size());
+
+        MBeanServer srv = ManagementFactory.getPlatformMBeanServer();
+
+        IgniteEx ignite = grid("server1-block");
+
+        ObjectName spiName = 
U.makeMBeanName(ignite.context().igniteInstanceName(), "SPIs",
+            ZookeeperDiscoverySpi.class.getSimpleName());
+
+        ZookeeperDiscoverySpiMBean bean = JMX.newMBeanProxy(srv, spiName, 
ZookeeperDiscoverySpiMBean.class);
+
+        assertNotNull(bean);
+
+        assertEquals(0, bean.getCommErrorProcNum());
+    }
+
+    /**
      * @throws Exception If failed.
      */
     @Test
@@ -5613,7 +5662,7 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+        @Override public @Nullable  DiscoveryCustomMessage ackMessage() {
             return null;
         }
 
@@ -5656,4 +5705,49 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
             return S.toString(TestFastStopProcessCustomMessageAck.class, this);
         }
     }
+
+    /**
+     * Block communications.
+     */
+    private static class TcpBlockCommunicationSpi extends TcpCommunicationSpi {
+        /**
+         * Whether this instance should actually block.
+         */
+        private final boolean isBlocking;
+
+        /** Blocked once. */
+        private boolean alreadyBlocked;
+
+        /**
+         * @param isBlocking Whether this instance should actually block.
+         */
+        public TcpBlockCommunicationSpi(boolean isBlocking) {
+            this.isBlocking = isBlocking;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected GridCommunicationClient 
createTcpClient(ClusterNode node, int connIdx)
+            throws IgniteCheckedException {
+            if (node.isClient() && blockHandshakeOnce(node.id())) {
+                ZookeeperDiscoverySpi spi = spi(ignite());
+
+                
spi.resolveCommunicationFailure(spi.getRemoteNodes().iterator().next(), new 
Exception("test"));
+
+                return null;
+            }
+
+            return super.createTcpClient(node, connIdx);
+        }
+
+        /** Check if this connection is blocked. */
+        private boolean blockHandshakeOnce(UUID nodeId) {
+            if (isBlocking && !alreadyBlocked) {
+                alreadyBlocked = true;
+
+                return true;
+            }
+
+            return false;
+        }
+    }
 }

Reply via email to