Repository: ignite Updated Branches: refs/heads/ignite-zk-ce 7e8f85ff8 -> cbe9980b4
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cbe9980b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cbe9980b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cbe9980b Branch: refs/heads/ignite-zk-ce Commit: cbe9980b45d6135ef6f67610f751c1599df9e54a Parents: 7e8f85f Author: sboikov <sboi...@gridgain.com> Authored: Thu Dec 14 14:48:26 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Dec 14 18:04:38 2017 +0300 ---------------------------------------------------------------------- .../CommunicationProblemContext.java | 62 ++++++++++ .../CommunicationProblemResolver.java | 28 +++++ .../configuration/IgniteConfiguration.java | 13 ++ .../ignite/internal/util/nio/GridNioServer.java | 10 +- .../communication/tcp/TcpCommunicationSpi.java | 49 ++++++++ .../discovery/CommunicationProblemContext.java | 62 ---------- .../discovery/CommunicationProblemResolver.java | 28 ----- .../DefaultCommunicationProblemResolver.java | 2 + .../ZkCommunicationErrorProcessFuture.java | 11 +- .../internal/ZkCommunicationProblemContext.java | 65 ++++++++++ .../ZkDistributedCollectDataFuture.java | 6 +- .../zk/internal/ZookeeperDiscoveryImpl.java | 59 ++++++++-- .../ZookeeperDiscoverySpiBasicTest.java | 118 ++++++++++++++++++- 13 files changed, 401 insertions(+), 112 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/cbe9980b/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationProblemContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationProblemContext.java b/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationProblemContext.java new file mode 100644 index 0000000..9d53a97 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationProblemContext.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.configuration; + +import java.util.List; +import java.util.UUID; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.spi.communication.CommunicationSpi; + +/** + * + */ +public interface CommunicationProblemContext { + /** + * @return Current topology snapshot. + */ + public List<ClusterNode> topologySnapshot(); + + /** + * @param node1 First node. + * @param node2 Second node. + * @return {@code True} if {@link CommunicationSpi} is able to establish connection from first node to second node. + */ + public boolean connectionAvailable(ClusterNode node1, ClusterNode node2); + + /** + * @return List of currently started cache. + */ + public List<String> startedCaches(); + + /** + * @param cacheName Cache name. + * @return Cache partitions affinity assignment. + */ + public List<List<ClusterNode>> cacheAffinity(String cacheName); + + /** + * @param cacheName Cache name. + * @return Cache partitions owners. + */ + public List<List<ClusterNode>> cachePartitionOwners(String cacheName); + + /** + * @param node Node to kill. + */ + public void killNode(ClusterNode node); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/cbe9980b/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationProblemResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationProblemResolver.java b/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationProblemResolver.java new file mode 100644 index 0000000..d1c6f27 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationProblemResolver.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.configuration; + +/** + * + */ +public interface CommunicationProblemResolver { + /** + * @param ctx Context. + */ + public void resolve(CommunicationProblemContext ctx); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/cbe9980b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index fc1fb6b..8c3c818 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -480,6 +480,9 @@ public class IgniteConfiguration { /** Client connector configuration. */ private ClientConnectorConfiguration cliConnCfg = ClientListenerProcessor.DFLT_CLI_CFG; + /** */ + private CommunicationProblemResolver commProblemRslvr; + /** * Creates valid grid configuration with all default values. */ @@ -507,6 +510,8 @@ public class IgniteConfiguration { loadBalancingSpi = cfg.getLoadBalancingSpi(); indexingSpi = cfg.getIndexingSpi(); + commProblemRslvr = cfg.getCommunicationProblemResolver(); + /* * Order alphabetically for maintenance purposes. */ @@ -590,6 +595,14 @@ public class IgniteConfiguration { warmupClos = cfg.getWarmupClosure(); } + public CommunicationProblemResolver getCommunicationProblemResolver() { + return commProblemRslvr; + } + + public void setCommunicationProblemResolver(CommunicationProblemResolver commProblemRslvr) { + this.commProblemRslvr = commProblemRslvr; + } + /** * Gets optional grid name. Returns {@code null} if non-default grid name was not * provided. http://git-wip-us.apache.org/repos/asf/ignite/blob/cbe9980b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index 1d595d2..14d55d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -485,6 +485,14 @@ public class GridNioServer<T> { return fut; } + public void closeFromWorkerThread(GridNioSession ses) { + assert ses instanceof GridSelectorNioSessionImpl : ses; + + GridSelectorNioSessionImpl ses0 = (GridSelectorNioSessionImpl)ses; + + ((AbstractNioClientWorker)ses0.worker()).close((GridSelectorNioSessionImpl)ses, null); + } + /** * @param ses Session. * @param msg Message. @@ -834,7 +842,7 @@ public class GridNioServer<T> { NioOperationFuture<GridNioSession> req = new NioOperationFuture<>(ch, false, meta); if (async) { - assert meta != null; + // assert meta != null; req.op = NioOperation.CONNECT; } http://git-wip-us.apache.org/repos/asf/ignite/blob/cbe9980b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 0b6daa3..e815312 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -32,6 +32,7 @@ import java.nio.channels.SocketChannel; import java.nio.channels.spi.AbstractInterruptibleChannel; import java.util.ArrayList; import java.util.Arrays; +import java.util.BitSet; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -81,6 +82,7 @@ import org.apache.ignite.internal.util.nio.GridConnectionBytesVerifyFilter; import org.apache.ignite.internal.util.nio.GridDirectParser; import org.apache.ignite.internal.util.nio.GridNioCodecFilter; import org.apache.ignite.internal.util.nio.GridNioFilter; +import org.apache.ignite.internal.util.nio.GridNioFuture; import org.apache.ignite.internal.util.nio.GridNioMessageReaderFactory; import org.apache.ignite.internal.util.nio.GridNioMessageTracker; import org.apache.ignite.internal.util.nio.GridNioMessageWriterFactory; @@ -2564,6 +2566,53 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati sendMessage0(node, msg, null); } + public IgniteFuture<BitSet> pingNodes(List<ClusterNode> nodes) { + ClusterNode node = nodes.get(0); + + try { + LinkedHashSet<InetSocketAddress> addrs = nodeAddresses(node); + + // /172.25.4.90:45012 + + for (InetSocketAddress addr : addrs) { + SocketChannel ch = SocketChannel.open(); + + ch.configureBlocking(false); + + ch.socket().setTcpNoDelay(tcpNoDelay); + ch.socket().setKeepAlive(true); + + boolean connect = ch.connect(addr); + + if (!connect) { + GridNioFuture<GridNioSession> fut = nioSrvr.createSession(ch, null, true, new IgniteInClosure<IgniteInternalFuture<GridNioSession>>() { + @Override public void apply(IgniteInternalFuture<GridNioSession> fut) { + try { + GridNioSession ses = fut.get(); + + log.info("Ping connected"); + + nioSrvr.closeFromWorkerThread(ses); + } + catch (Exception e) { + e.printStackTrace(); + } + } + }); + + fut.get(); + } + else + log.info("Connected"); + } + } + catch (Exception e) { + throw new IgniteSpiException(e); + } + + return null; + } + /** * Sends given message to destination node. Note that characteristics of the * exchange such as durability, guaranteed delivery or error notification is http://git-wip-us.apache.org/repos/asf/ignite/blob/cbe9980b/modules/core/src/main/java/org/apache/ignite/spi/discovery/CommunicationProblemContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/CommunicationProblemContext.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/CommunicationProblemContext.java deleted file mode 100644 index 71673f1..0000000 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/CommunicationProblemContext.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spi.discovery; - -import java.util.List; -import java.util.UUID; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.spi.communication.CommunicationSpi; - -/** - * - */ -public interface CommunicationProblemContext { - /** - * @return Current topology snapshot. - */ - public List<ClusterNode> topologySnapshot(); - - /** - * @param node1 First node. - * @param node2 Second node. - * @return {@code True} if {@link CommunicationSpi} is able to establish connection from first node to second node. - */ - public boolean connectionAvailable(ClusterNode node1, ClusterNode node2); - - /** - * @return List of currently started cache. - */ - public List<String> startedCaches(); - - /** - * @param cacheName Cache name. - * @return Cache partitions affinity assignment. - */ - public List<List<ClusterNode>> cacheAffinity(String cacheName); - - /** - * @param cacheName Cache name. - * @return Cache partitions owners. - */ - public List<List<ClusterNode>> cachePartitionOwners(String cacheName); - - /** - * @param node Node to kill. - */ - public void killNode(ClusterNode node); -} http://git-wip-us.apache.org/repos/asf/ignite/blob/cbe9980b/modules/core/src/main/java/org/apache/ignite/spi/discovery/CommunicationProblemResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/CommunicationProblemResolver.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/CommunicationProblemResolver.java deleted file mode 100644 index a9b620b..0000000 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/CommunicationProblemResolver.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spi.discovery; - -/** - * - */ -public interface CommunicationProblemResolver { - /** - * @param ctx Context. - */ - public void resolve(CommunicationProblemContext ctx); -} http://git-wip-us.apache.org/repos/asf/ignite/blob/cbe9980b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DefaultCommunicationProblemResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DefaultCommunicationProblemResolver.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DefaultCommunicationProblemResolver.java index 4d0262d..b2d4bf0 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DefaultCommunicationProblemResolver.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DefaultCommunicationProblemResolver.java @@ -20,6 +20,8 @@ package org.apache.ignite.spi.discovery; import java.util.BitSet; import java.util.List; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CommunicationProblemContext; +import org.apache.ignite.configuration.CommunicationProblemResolver; /** * http://git-wip-us.apache.org/repos/asf/ignite/blob/cbe9980b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java index 15744a2..6812ab0 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java @@ -18,6 +18,7 @@ package org.apache.ignite.spi.discovery.zk.internal; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -30,11 +31,13 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.spi.IgniteSpiTimeoutObject; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.jboss.netty.util.internal.ConcurrentHashMap; import org.jetbrains.annotations.Nullable; /** - * + * Future is created on each node when either connection error occurs or resolve communication error request + * received. */ class ZkCommunicationErrorProcessFuture extends GridFutureAdapter<Void> implements IgniteSpiTimeoutObject, Runnable { /** */ @@ -144,8 +147,12 @@ class ZkCommunicationErrorProcessFuture extends GridFutureAdapter<Void> implemen * @param nodes Nodes to ping. * @throws Exception If failed. */ - void pingNodesAndNotifyFuture(long locNodeOrder, ZkRuntimeState rtState, String futPath, Collection<ClusterNode> nodes) + void pingNodesAndNotifyFuture(long locNodeOrder, ZkRuntimeState rtState, String futPath, List<ClusterNode> nodes) throws Exception { + TcpCommunicationSpi spi = (TcpCommunicationSpi)impl.spi.ignite().configuration().getCommunicationSpi(); + + spi.pingNodes(nodes); + ZkDistributedCollectDataFuture.saveNodeResult(futPath, rtState.zkClient, locNodeOrder, null); } http://git-wip-us.apache.org/repos/asf/ignite/blob/cbe9980b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationProblemContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationProblemContext.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationProblemContext.java new file mode 100644 index 0000000..fd11b55 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationProblemContext.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CommunicationProblemContext; + +/** + * + */ +class ZkCommunicationProblemContext implements CommunicationProblemContext { + /** */ + private Set<ClusterNode> killedNodes = new HashSet<>(); + + /** {@inheritDoc} */ + @Override public List<ClusterNode> topologySnapshot() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean connectionAvailable(ClusterNode node1, ClusterNode node2) { + return false; + } + + /** {@inheritDoc} */ + @Override public List<String> startedCaches() { + return null; + } + + /** {@inheritDoc} */ + @Override public List<List<ClusterNode>> cacheAffinity(String cacheName) { + return null; + } + + /** {@inheritDoc} */ + @Override public List<List<ClusterNode>> cachePartitionOwners(String cacheName) { + return null; + } + + /** {@inheritDoc} */ + @Override public void killNode(ClusterNode node) { + if (node == null) + throw new NullPointerException(); + + killedNodes.add(node); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/cbe9980b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java index d33001b..e5d2356 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java @@ -78,8 +78,10 @@ class ZkDistributedCollectDataFuture extends GridFutureAdapter<Void> { if (remainingNodes.isEmpty()) completeAndNotifyListener(); else { - if (log.isInfoEnabled()) - log.info("Initialize data collect future [futPath=" + futPath + ", nodes=" + remainingNodes.size() + ']'); + if (log.isInfoEnabled()) { + log.info("Initialize data collect future [futPath=" + futPath + ", " + + "remainingNodes=" + remainingNodes.size() + ']'); + } rtState.zkClient.getChildrenAsync(futPath, watcher, watcher); } http://git-wip-us.apache.org/repos/asf/ignite/blob/cbe9980b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 82d9c4b..65bf6e7 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -41,6 +41,7 @@ import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CommunicationProblemResolver; import org.apache.ignite.events.EventType; import org.apache.ignite.internal.ClusterMetricsSnapshot; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; @@ -1043,6 +1044,22 @@ public class ZookeeperDiscoveryImpl { assert locNode.order() > 0 : locNode; assert rtState.evtsData != null; + UUID futId = rtState.evtsData.communicationErrorResolveFutureId(); + + if (futId != null) { + if (log.isInfoEnabled()) { + log.info("New discovery coordinator will handle already started cluster-wide communication " + + "error resolve [reqId=" + futId + ']'); + } + + ZkCommunicationErrorProcessFuture fut = commErrProcFut.get(); + + ZkDistributedCollectDataFuture collectResFut = collectCommunicationStatusFuture(futId); + + if (fut != null) + fut.nodeResultCollectFuture(collectResFut); + } + for (ZkDiscoveryEventData evtData : rtState.evtsData.evts.values()) evtData.initRemainingAcks(rtState.top.nodesByOrder.values()); @@ -2106,16 +2123,7 @@ public class ZookeeperDiscoveryImpl { final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot(); if (rtState.crd) { - ZkDistributedCollectDataFuture nodeResFut = new ZkDistributedCollectDataFuture(this, rtState, futPath, - new Callable<Void>() { - @Override public Void call() throws Exception { - // Future is completed from ZK event thread. - finishCommunicationResolveProcess(rtState); - - return null; - } - } - ); + ZkDistributedCollectDataFuture nodeResFut = collectCommunicationStatusFuture(msg.id); fut.nodeResultCollectFuture(nodeResFut); } @@ -2128,14 +2136,35 @@ public class ZookeeperDiscoveryImpl { } /** + * @param futId Future ID. + * @return Future. + * @throws Exception If failed. + */ + private ZkDistributedCollectDataFuture collectCommunicationStatusFuture(UUID futId) throws Exception { + return new ZkDistributedCollectDataFuture(this, rtState, zkPaths.distributedFutureBasePath(futId), + new Callable<Void>() { + @Override public Void call() throws Exception { + // Future is completed from ZK event thread. + onCommunicationResolveStatusReceived(rtState); + + return null; + } + } + ); + } + + /** * @param rtState Runtime state. * @throws Exception If failed. */ - private void finishCommunicationResolveProcess(ZkRuntimeState rtState) throws Exception { + private void onCommunicationResolveStatusReceived(ZkRuntimeState rtState) throws Exception { ZkDiscoveryEventsData evtsData = rtState.evtsData; UUID futId = rtState.evtsData.communicationErrorResolveFutureId(); + if (log.isInfoEnabled()) + log.info("Received communication status from all nodes, call resolver [reqId=" + futId + ']'); + assert futId != null; ZkCommunicationErrorResolveFinishMessage msg = new ZkCommunicationErrorResolveFinishMessage(futId); @@ -2148,6 +2177,14 @@ public class ZookeeperDiscoveryImpl { rtState.zkClient, marshalZip(res)); + CommunicationProblemResolver rslvr = spi.ignite().configuration().getCommunicationProblemResolver(); + + if (rslvr != null) { + ZkCommunicationProblemContext ctx = new ZkCommunicationProblemContext(); + + rslvr.resolve(ctx); + } + evtsData.evtIdGen++; ZkDiscoveryCustomEventData evtData = new ZkDiscoveryCustomEventData( http://git-wip-us.apache.org/repos/asf/ignite/blob/cbe9980b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java index ec70be6..64fcd34 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java @@ -20,6 +20,7 @@ package org.apache.ignite.spi.discovery.zk.internal; import java.io.File; import java.io.Serializable; import java.util.ArrayList; +import java.util.BitSet; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -66,13 +67,14 @@ import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.logger.java.JavaLogger; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; -import org.apache.ignite.spi.discovery.CommunicationProblemContext; -import org.apache.ignite.spi.discovery.CommunicationProblemResolver; +import org.apache.ignite.configuration.CommunicationProblemContext; +import org.apache.ignite.configuration.CommunicationProblemResolver; import org.apache.ignite.spi.discovery.DiscoverySpi; import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi; import org.apache.ignite.testframework.GridTestUtils; @@ -121,6 +123,9 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { private boolean testSockNio; /** */ + private boolean testCommSpi; + + /** */ private int sesTimeout; /** */ @@ -139,7 +144,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { private boolean persistence; /** */ - private CommunicationProblemResolver communicationProblemResolver; + private CommunicationProblemResolver commProblemRslvr; /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { @@ -236,6 +241,12 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { cfg.setDataStorageConfiguration(memCfg); } + if (testCommSpi) + cfg.setCommunicationSpi(new ZkTestCommunicationSpi()); + + if (commProblemRslvr != null) + cfg.setCommunicationProblemResolver(commProblemRslvr); + return cfg; } @@ -1792,7 +1803,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { assert nodes > 1; sesTimeout = 2000; - communicationProblemResolver = new NoOpCommunicationProblemResolver(); + commProblemRslvr = new NoOpCommunicationProblemResolver(); startGridsMultiThreaded(nodes); @@ -1824,7 +1835,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { public void testNoOpCommunicationErrorResolve_3() throws Exception { // One node fails before sending communication status. sesTimeout = 2000; - communicationProblemResolver = new NoOpCommunicationProblemResolver(); + commProblemRslvr = new NoOpCommunicationProblemResolver(); startGridsMultiThreaded(3); @@ -1864,6 +1875,64 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testNoOpCommunicationErrorResolve_4() throws Exception { + // Coordinator changes while resolve process is in progress. + testCommSpi = true; + + sesTimeout = 2000; + commProblemRslvr = new NoOpCommunicationProblemResolver(); + + startGrid(0); + + startGridsMultiThreaded(1, 3); + + ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.forNode(ignite(3)); + + commSpi.pingLatch = new CountDownLatch(1); + + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() { + ZookeeperDiscoverySpi spi = spi(ignite(1)); + + spi.onCommunicationConnectionError(ignite(2).cluster().localNode(), new Exception("test")); + + return null; + } + }); + + U.sleep(1000); + + assertFalse(fut.isDone()); + + stopGrid(0); + + commSpi.pingLatch.countDown(); + + fut.get(); + + waitForTopology(3); + } + + /** + * TODO ZK: move to comm spi tests. + * + * @throws Exception If failed. + */ + public void testNodesPing() throws Exception { + startGrids(3); + + TcpCommunicationSpi spi = (TcpCommunicationSpi)ignite(1).configuration().getCommunicationSpi(); + + List<ClusterNode> nodes = new ArrayList<>(); + + nodes.add(ignite(2).cluster().localNode()); + + spi.pingNodes(nodes); + } + + /** * @param dfltConsistenId Default consistent ID flag. * @throws Exception If failed. */ @@ -2298,6 +2367,9 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { }, 10000)); } + /** + * @param node Node. + */ private static void closeZkClient(Ignite node) { DiscoverySpi spi = node.configuration().getDiscoverySpi(); @@ -2322,6 +2394,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { /** * @param spi Spi instance. + * @return Zookeeper client. */ private static ZooKeeper zkClient(ZookeeperDiscoverySpi spi) { return GridTestUtils.getFieldValue(spi, "impl", "rtState", "zkClient", "zk"); @@ -2333,6 +2406,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { * @param log Logger. * @param clients Clients. * @param disconnectedC Closure which will be run when client node disconnected. + * @param closeSock {@code True} to simulate reconnect by closing zk client's socket. * @throws Exception If failed. */ public static void reconnectClientNodes(final IgniteLogger log, @@ -2449,7 +2523,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { * @param latch Latch. * @throws Exception If failed. */ - protected static void waitReconnectEvent(IgniteLogger log, CountDownLatch latch) throws Exception { + private static void waitReconnectEvent(IgniteLogger log, CountDownLatch latch) throws Exception { if (!latch.await(30_000, MILLISECONDS)) { log.error("Failed to wait for reconnect event, will dump threads, latch count: " + latch.getCount()); @@ -2475,6 +2549,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { /** * */ + @SuppressWarnings("MismatchedReadAndWriteOfArray") static class TestAffinityFunction extends RendezvousAffinityFunction { /** */ private static final long serialVersionUID = 0L; @@ -2536,4 +2611,35 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { // No-op. } } + + /** + * + */ + static class ZkTestCommunicationSpi extends TcpCommunicationSpi { + /** */ + private volatile CountDownLatch pingLatch; + + /** + * @param ignite Node. + * @return Node's communication SPI. + */ + static ZkTestCommunicationSpi forNode(Ignite ignite) { + return (ZkTestCommunicationSpi)ignite.configuration().getCommunicationSpi(); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<BitSet> pingNodes(List<ClusterNode> nodes) { + CountDownLatch pingLatch = this.pingLatch; + + try { + if (pingLatch != null) + pingLatch.await(); + } + catch (InterruptedException e) { + throw new IgniteException(e); + } + + return super.pingNodes(nodes); + } + } }