Repository: ignite Updated Branches: refs/heads/ignite-zk 953f07929 -> fe515ee55
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fe515ee5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fe515ee5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fe515ee5 Branch: refs/heads/ignite-zk Commit: fe515ee557b70e88588e08e4af77fd6b4e88d58d Parents: 953f079 Author: sboikov <sboi...@gridgain.com> Authored: Tue Dec 12 16:41:09 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Dec 12 17:10:02 2017 +0300 ---------------------------------------------------------------------- .../discovery/GridDiscoveryManager.java | 4 +-- .../managers/discovery/IgniteDiscoverySpi.java | 18 +++++++++++--- .../IgniteDiscoverySpiInternalListener.java | 4 ++- .../communication/tcp/TcpCommunicationSpi.java | 19 +++++++++++++- .../spi/discovery/tcp/TcpDiscoverySpi.java | 22 +++++++++++------ .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 14 +++++++++-- .../discovery/zk/internal/ZkIgnitePaths.java | 26 ++++++++++++++++++++ .../zk/internal/ZookeeperDiscoveryImpl.java | 13 ++++++---- .../ZookeeperDiscoverySpiBasicTest.java | 2 +- 9 files changed, 100 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/fe515ee5/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index b36a607..97441d7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -2200,7 +2200,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { return ctx.discovery().localNode().isClient() && (spi instanceof IgniteDiscoverySpi) && - ((IgniteDiscoverySpi)spi).reconnectSupported(); + ((IgniteDiscoverySpi)spi).clientReconnectSupported(); } /** @@ -2213,7 +2213,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { DiscoverySpi discoverySpi = getSpi(); - ((IgniteDiscoverySpi)discoverySpi).reconnect(); + ((IgniteDiscoverySpi)discoverySpi).clientReconnect(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/fe515ee5/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java index 1e4524e..9a1faa2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.managers.discovery; import java.util.UUID; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.spi.discovery.DiscoverySpi; /** @@ -34,15 +35,15 @@ public interface IgniteDiscoverySpi extends DiscoverySpi { * * @return {@code True} if SPI supports client reconnect. */ - public boolean reconnectSupported(); + public boolean clientReconnectSupported(); /** * */ - public void reconnect(); + public void clientReconnect(); /** - * + * For TESTING only. */ public void simulateNodeFailure(); @@ -52,4 +53,15 @@ public interface IgniteDiscoverySpi extends DiscoverySpi { * @param lsnr Listener. */ public void setInternalListener(IgniteDiscoverySpiInternalListener lsnr); + + /** + * @return {@code True} if supports communication error resolve. + */ + public boolean supportsCommunicationErrorResolve(); + + /** + * @param node Problem node. + * @param err Connection error. + */ + public void onCommunicationConnectionError(ClusterNode node, Exception err); } http://git-wip-us.apache.org/repos/asf/ignite/blob/fe515ee5/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java index eab35ce..1983ad3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java @@ -22,7 +22,7 @@ import org.apache.ignite.spi.discovery.DiscoverySpi; import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; /** - * + * For TESTING only. */ public interface IgniteDiscoverySpiInternalListener { /** @@ -31,8 +31,10 @@ public interface IgniteDiscoverySpiInternalListener { public void beforeJoin(IgniteLogger log); /** + * @param spi SPI instance. * @param log Logger. * @param msg Custom message. + * @return {@code False} to cancel event send. */ public boolean beforeSendCustomEvent(DiscoverySpi spi, IgniteLogger log, DiscoverySpiCustomMessage msg); } http://git-wip-us.apache.org/repos/asf/ignite/blob/fe515ee5/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 03e64fa..0b6daa3 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -3352,7 +3352,24 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati "operating system firewall is disabled on local and remote hosts) " + "[addrs=" + addrs + ']'); - if (enableForcibleNodeKill) { + + boolean commErrResolve = false; + + if (connectionError(errs)) { + DiscoverySpi discoverySpi = ignite.configuration().getDiscoverySpi(); + + if (discoverySpi instanceof IgniteDiscoverySpi) { + IgniteDiscoverySpi discoverySpi0 = (IgniteDiscoverySpi)discoverySpi; + + if (discoverySpi0.supportsCommunicationErrorResolve()) { + commErrResolve = true; + + discoverySpi0.onCommunicationConnectionError(node, errs); + } + } + } + + if (!commErrResolve && enableForcibleNodeKill) { if (getSpiContext().node(node.id()) != null && (CU.clientNode(node) || !CU.clientNode(getLocalNode())) && connectionError(errs)) { http://git-wip-us.apache.org/repos/asf/ignite/blob/fe515ee5/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 52b229f..0e2f851 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -2090,23 +2090,31 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscovery return ignite().configuration().getSslContextFactory() != null; } - /** - * Force reconnect to cluster. - * - * @throws IgniteSpiException If failed. - */ - public void reconnect() throws IgniteSpiException { + /** {@inheritDoc} */ + public void clientReconnect() throws IgniteSpiException { impl.reconnect(); } + /** {@inheritDoc} */ @Override public boolean knownNode(UUID nodeId) { return getNode0(nodeId) != null; } - @Override public boolean reconnectSupported() { + /** {@inheritDoc} */ + @Override public boolean clientReconnectSupported() { return !clientReconnectDisabled; } + /** {@inheritDoc} */ + @Override public boolean supportsCommunicationErrorResolve() { + return false; + } + + /** {@inheritDoc} */ + @Override public void onCommunicationConnectionError(ClusterNode node, Exception err) { + throw new UnsupportedOperationException(); + } + /** * <strong>FOR TEST ONLY!!!</strong> */ http://git-wip-us.apache.org/repos/asf/ignite/blob/fe515ee5/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java index 3e73da0..98a22d7 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java @@ -175,12 +175,12 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery } /** {@inheritDoc} */ - @Override public boolean reconnectSupported() { + @Override public boolean clientReconnectSupported() { return !clientReconnectDisabled; } /** {@inheritDoc} */ - @Override public void reconnect() { + @Override public void clientReconnect() { impl.reconnect(); } @@ -190,6 +190,16 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery } /** {@inheritDoc} */ + @Override public boolean supportsCommunicationErrorResolve() { + return true; + } + + /** {@inheritDoc} */ + @Override public void onCommunicationConnectionError(ClusterNode node, Exception err) { + impl.onCommunicationConnectionError(node, err); + } + + /** {@inheritDoc} */ @Nullable @Override public Serializable consistentId() throws IgniteSpiException { if (consistentId == null) { consistentId = ignite.configuration().getConsistentId(); http://git-wip-us.apache.org/repos/asf/ignite/blob/fe515ee5/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java index 0d47658..f08032a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java @@ -158,6 +158,11 @@ class ZkIgnitePaths { return clusterDir + "/" + path; } + /** + * @param nodeId Node ID. + * @param prefixId Unique prefix ID. + * @return Path. + */ String joiningNodeDataPath(UUID nodeId, UUID prefixId) { return joinDataDir + '/' + prefixId + ":" + nodeId.toString(); } @@ -216,6 +221,10 @@ class ZkIgnitePaths { return UUID.fromString(idStr); } + /** + * @param path Event path. + * @return Event unique prefix. + */ static String customEventPrefix(String path) { // <uuid prefix>:<node id>:<partCnt>|<seq> @@ -239,14 +248,31 @@ class ZkIgnitePaths { return partCnt; } + /** + * @param prefix Prefix. + * @param nodeId Node ID. + * @param partCnt Parts count. + * @return Path. + */ String createCustomEventPath(String prefix, UUID nodeId, int partCnt) { return customEvtsDir + "/" + prefix + ":" + nodeId + ":" + String.format("%04d", partCnt) + '|'; } + /** + * @param prefix Prefix. + * @param nodeId Node ID. + * @return Path. + */ String customEventPartsBasePath(String prefix, UUID nodeId) { return customEvtsPartsDir + "/" + prefix + ":" + nodeId + ":"; } + /** + * @param prefix Prefix. + * @param nodeId Node ID. + * @param part Part number. + * @return Path. + */ String customEventPartPath(String prefix, UUID nodeId, int part) { return customEventPartsBasePath(prefix, nodeId) + String.format("%04d", part); } http://git-wip-us.apache.org/repos/asf/ignite/blob/fe515ee5/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index effecbb..783595f 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -81,7 +81,7 @@ import static org.apache.zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL; import static org.apache.zookeeper.CreateMode.PERSISTENT; /** - * TODO ZK: check if compression makes sense. + * */ public class ZookeeperDiscoveryImpl { /** */ @@ -221,11 +221,14 @@ public class ZookeeperDiscoveryImpl { private final AtomicReference<ZkCommunicationErrorProcessFuture> commErrProcFut = new AtomicReference<>(); /** - * @param nodeId Problem node ID + * @param node0 Problem node ID * @param err Connect error. */ - public void onCommunicationError(UUID nodeId, Exception err) { - ZookeeperClusterNode node = node(nodeId); + public void onCommunicationConnectionError(ClusterNode node0, Exception err) { + if (true) + return; + + ZookeeperClusterNode node = node(node0.id()); if (node == null) return; @@ -245,7 +248,7 @@ public class ZookeeperDiscoveryImpl { } try { - fut.nodeStatusFuture(nodeId).get(); + fut.nodeStatusFuture(node.id()).get(); } catch (IgniteCheckedException e) { throw new IgniteSpiException(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/fe515ee5/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java index af5fc8f..3f6a8dc 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java @@ -1582,7 +1582,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { @Override public Void call() throws Exception { ZookeeperDiscoverySpi spi = waitSpi(getTestIgniteInstanceName(SRVS)); - spi.reconnect(); + spi.clientReconnect(); return null; }