zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0bbf6077 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0bbf6077 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0bbf6077 Branch: refs/heads/ignite-zk Commit: 0bbf60775b41fae9b24debee082c8fd97fefef51 Parents: 0a9fefb Author: sboikov <sboi...@gridgain.com> Authored: Wed Jan 10 11:14:37 2018 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Jan 10 11:14:37 2018 +0300 ---------------------------------------------------------------------- .../CommunicationFailureContext.java | 61 ++++ .../CommunicationFailureResolver.java | 28 ++ .../CommunicationProblemContext.java | 61 ---- .../CommunicationProblemResolver.java | 28 -- .../DefaultCommunicationFailureResolver.java | 305 +++++++++++++++++++ .../DefaultCommunicationProblemResolver.java | 305 ------------------- .../configuration/IgniteConfiguration.java | 14 +- .../apache/ignite/internal/IgniteKernal.java | 4 +- .../internal/managers/GridManagerAdapter.java | 4 +- .../discovery/GridDiscoveryManager.java | 18 +- .../org/apache/ignite/spi/IgniteSpiAdapter.java | 4 +- .../org/apache/ignite/spi/IgniteSpiContext.java | 4 +- .../communication/tcp/TcpCommunicationSpi.java | 4 +- .../internal/ZkCommunicationFailureContext.java | 130 ++++++++ .../internal/ZkCommunicationProblemContext.java | 130 -------- .../zk/internal/ZookeeperDiscoveryImpl.java | 6 +- .../zk/internal/ZookeeperDiscoverySpiTest.java | 68 +++-- .../testframework/GridSpiTestContext.java | 4 +- 18 files changed, 590 insertions(+), 588 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0bbf6077/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationFailureContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationFailureContext.java b/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationFailureContext.java new file mode 100644 index 0000000..8a9906b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationFailureContext.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.configuration; + +import java.util.List; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.spi.communication.CommunicationSpi; + +/** + * + */ +public interface CommunicationFailureContext { + /** + * @return Current topology snapshot. + */ + public List<ClusterNode> topologySnapshot(); + + /** + * @param node1 First node. + * @param node2 Second node. + * @return {@code True} if {@link CommunicationSpi} is able to establish connection from first node to second node. + */ + public boolean connectionAvailable(ClusterNode node1, ClusterNode node2); + + /** + * @return List of currently started cache. + */ + public List<String> startedCaches(); + + /** + * @param cacheName Cache name. + * @return Cache partitions affinity assignment. + */ + public List<List<ClusterNode>> cacheAffinity(String cacheName); + + /** + * @param cacheName Cache name. + * @return Cache partitions owners. + */ + public List<List<ClusterNode>> cachePartitionOwners(String cacheName); + + /** + * @param node Node to kill. + */ + public void killNode(ClusterNode node); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0bbf6077/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationFailureResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationFailureResolver.java b/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationFailureResolver.java new file mode 100644 index 0000000..ca8b4d9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationFailureResolver.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.configuration; + +/** + * + */ +public interface CommunicationFailureResolver { + /** + * @param ctx Context. + */ + public void resolve(CommunicationFailureContext ctx); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0bbf6077/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationProblemContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationProblemContext.java b/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationProblemContext.java deleted file mode 100644 index 4a9f140..0000000 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationProblemContext.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.configuration; - -import java.util.List; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.spi.communication.CommunicationSpi; - -/** - * - */ -public interface CommunicationProblemContext { - /** - * @return Current topology snapshot. - */ - public List<ClusterNode> topologySnapshot(); - - /** - * @param node1 First node. - * @param node2 Second node. - * @return {@code True} if {@link CommunicationSpi} is able to establish connection from first node to second node. - */ - public boolean connectionAvailable(ClusterNode node1, ClusterNode node2); - - /** - * @return List of currently started cache. - */ - public List<String> startedCaches(); - - /** - * @param cacheName Cache name. - * @return Cache partitions affinity assignment. - */ - public List<List<ClusterNode>> cacheAffinity(String cacheName); - - /** - * @param cacheName Cache name. - * @return Cache partitions owners. - */ - public List<List<ClusterNode>> cachePartitionOwners(String cacheName); - - /** - * @param node Node to kill. - */ - public void killNode(ClusterNode node); -} http://git-wip-us.apache.org/repos/asf/ignite/blob/0bbf6077/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationProblemResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationProblemResolver.java b/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationProblemResolver.java deleted file mode 100644 index d1c6f27..0000000 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationProblemResolver.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.configuration; - -/** - * - */ -public interface CommunicationProblemResolver { - /** - * @param ctx Context. - */ - public void resolve(CommunicationProblemContext ctx); -} http://git-wip-us.apache.org/repos/asf/ignite/blob/0bbf6077/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationFailureResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationFailureResolver.java b/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationFailureResolver.java new file mode 100644 index 0000000..4aa6f28 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationFailureResolver.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.configuration; + +import java.util.BitSet; +import java.util.List; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.resources.LoggerResource; + +/** + * + */ +public class DefaultCommunicationFailureResolver implements CommunicationFailureResolver { + /** */ + @LoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ + @Override public void resolve(CommunicationFailureContext ctx) { + ClusterGraph graph = new ClusterGraph(log, ctx); + + ClusterSearch cluster = graph.findLargestIndependentCluster(); + + List<ClusterNode> nodes = ctx.topologySnapshot(); + + assert nodes.size() > 0; + assert cluster != null; + + if (graph.checkFullyConnected(cluster.nodesBitSet)) { + assert cluster.nodeCnt <= nodes.size(); + + if (cluster.nodeCnt < nodes.size()) { + if (log.isInfoEnabled()) { + log.info("Communication problem resolver found fully connected independent cluster [" + + "clusterSrvCnt=" + cluster.srvCnt + + ", clusterTotalNodes=" + cluster.nodeCnt + + ", totalAliveNodes=" + nodes.size() + "]"); + } + + for (int i = 0; i < nodes.size(); i++) { + if (!cluster.nodesBitSet.get(i)) + ctx.killNode(nodes.get(i)); + } + } + else + U.warn(log, "All alive nodes are fully connected, this should be resolved automatically."); + } + else { + if (log.isInfoEnabled()) { + log.info("Communication problem resolver failed to find fully connected independent cluster."); + } + } + } + + /** + * @param cluster Cluster nodes mask. + * @param nodes Nodes. + * @param limit IDs limit. + * @return Cluster node IDs string. + */ + private static String clusterNodeIds(BitSet cluster, List<ClusterNode> nodes, int limit) { + int startIdx = 0; + + StringBuilder builder = new StringBuilder(); + + int cnt = 0; + + for (;;) { + int idx = cluster.nextSetBit(startIdx); + + if (idx == -1) + break; + + startIdx = idx + 1; + + if (builder.length() == 0) { + builder.append('['); + } + else + builder.append(", "); + + builder.append(nodes.get(idx).id()); + + if (cnt++ > limit) + builder.append(", ..."); + } + + builder.append(']'); + + return builder.toString(); + } + + /** + * + */ + private static class ClusterSearch { + /** */ + int srvCnt; + + /** */ + int nodeCnt; + + /** */ + final BitSet nodesBitSet; + + /** + * @param nodes Total nodes. + */ + ClusterSearch(int nodes) { + nodesBitSet = new BitSet(nodes); + } + } + + /** + * + */ + private static class ClusterGraph { + /** */ + private final static int WORD_IDX_SHIFT = 6; + + /** */ + private final IgniteLogger log; + + /** */ + private final int nodeCnt; + + /** */ + private final long[] visitBitSet; + + /** */ + private final CommunicationFailureContext ctx; + + /** */ + private final List<ClusterNode> nodes; + + /** + * @param log Logger. + * @param ctx Context. + */ + ClusterGraph(IgniteLogger log, CommunicationFailureContext ctx) { + this.log = log; + this.ctx = ctx; + + nodes = ctx.topologySnapshot(); + + nodeCnt = nodes.size(); + + assert nodeCnt > 0; + + visitBitSet = initBitSet(nodeCnt); + } + + /** + * @param bitIndex Bit index. + * @return Word index containing bit with given index. + */ + private static int wordIndex(int bitIndex) { + return bitIndex >> WORD_IDX_SHIFT; + } + + /** + * @param bitCnt Number of bits. + * @return Bit set words. + */ + static long[] initBitSet(int bitCnt) { + return new long[wordIndex(bitCnt - 1) + 1]; + } + + /** + * @return Cluster nodes bit set. + */ + ClusterSearch findLargestIndependentCluster() { + ClusterSearch maxCluster = null; + + for (int i = 0; i < nodeCnt; i++) { + if (getBit(visitBitSet, i)) + continue; + + ClusterSearch cluster = new ClusterSearch(nodeCnt); + + search(cluster, i); + + if (log.isInfoEnabled()) { + log.info("Communication problem resolver found cluster [srvCnt=" + cluster.srvCnt + + ", totalNodeCnt=" + cluster.nodeCnt + + ", nodeIds=" + clusterNodeIds(cluster.nodesBitSet, nodes, 1000) + "]"); + } + + if (maxCluster == null || cluster.srvCnt > maxCluster.srvCnt) + maxCluster = cluster; + } + + return maxCluster; + } + + /** + * @param cluster Cluster nodes bit set. + * @return {@code True} if all cluster nodes are able to connect to each other. + */ + boolean checkFullyConnected(BitSet cluster) { + int startIdx = 0; + + int clusterNodes = cluster.cardinality(); + + for (;;) { + int idx = cluster.nextSetBit(startIdx); + + if (idx == -1) + break; + + ClusterNode node1 = nodes.get(idx); + + for (int i = 0; i < clusterNodes; i++) { + if (!cluster.get(i) || i == idx) + continue; + + ClusterNode node2 = nodes.get(i); + + if (cluster.get(i) && !ctx.connectionAvailable(node1, node2)) + return false; + } + + startIdx = idx + 1; + } + + return true; + } + + /** + * @param cluster Current cluster bit set. + * @param idx Node index. + */ + void search(ClusterSearch cluster, int idx) { + assert !getBit(visitBitSet, idx); + + setBit(visitBitSet, idx); + + cluster.nodesBitSet.set(idx); + cluster.nodeCnt++; + + ClusterNode node1 = nodes.get(idx); + + if (!CU.clientNode(node1)) + cluster.srvCnt++; + + for (int i = 0; i < nodeCnt; i++) { + if (i == idx || getBit(visitBitSet, i)) + continue; + + ClusterNode node2 = nodes.get(i); + + boolean connected = ctx.connectionAvailable(node1, node2) || + ctx.connectionAvailable(node2, node1); + + if (connected) + search(cluster, i); + } + } + + /** + * @param words Bit set words. + * @param bitIndex Bit index. + */ + static void setBit(long words[], int bitIndex) { + int wordIndex = wordIndex(bitIndex); + + words[wordIndex] |= (1L << bitIndex); + } + + /** + * @param words Bit set words. + * @param bitIndex Bit index. + * @return Bit value. + */ + static boolean getBit(long[] words, int bitIndex) { + int wordIndex = wordIndex(bitIndex); + + return (words[wordIndex] & (1L << bitIndex)) != 0; + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DefaultCommunicationFailureResolver.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0bbf6077/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationProblemResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationProblemResolver.java b/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationProblemResolver.java deleted file mode 100644 index 163ef2a..0000000 --- a/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationProblemResolver.java +++ /dev/null @@ -1,305 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.configuration; - -import java.util.BitSet; -import java.util.List; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.resources.LoggerResource; - -/** - * - */ -public class DefaultCommunicationProblemResolver implements CommunicationProblemResolver { - /** */ - @LoggerResource - private IgniteLogger log; - - /** {@inheritDoc} */ - @Override public void resolve(CommunicationProblemContext ctx) { - ClusterGraph graph = new ClusterGraph(log, ctx); - - ClusterSearch cluster = graph.findLargestIndependentCluster(); - - List<ClusterNode> nodes = ctx.topologySnapshot(); - - assert nodes.size() > 0; - assert cluster != null; - - if (graph.checkFullyConnected(cluster.nodesBitSet)) { - assert cluster.nodeCnt <= nodes.size(); - - if (cluster.nodeCnt < nodes.size()) { - if (log.isInfoEnabled()) { - log.info("Communication problem resolver found fully connected independent cluster [" + - "clusterSrvCnt=" + cluster.srvCnt + - ", clusterTotalNodes=" + cluster.nodeCnt + - ", totalAliveNodes=" + nodes.size() + "]"); - } - - for (int i = 0; i < nodes.size(); i++) { - if (!cluster.nodesBitSet.get(i)) - ctx.killNode(nodes.get(i)); - } - } - else - U.warn(log, "All alive nodes are fully connected, this should be resolved automatically."); - } - else { - if (log.isInfoEnabled()) { - log.info("Communication problem resolver failed to find fully connected independent cluster."); - } - } - } - - /** - * @param cluster Cluster nodes mask. - * @param nodes Nodes. - * @param limit IDs limit. - * @return Cluster node IDs string. - */ - private static String clusterNodeIds(BitSet cluster, List<ClusterNode> nodes, int limit) { - int startIdx = 0; - - StringBuilder builder = new StringBuilder(); - - int cnt = 0; - - for (;;) { - int idx = cluster.nextSetBit(startIdx); - - if (idx == -1) - break; - - startIdx = idx + 1; - - if (builder.length() == 0) { - builder.append('['); - } - else - builder.append(", "); - - builder.append(nodes.get(idx).id()); - - if (cnt++ > limit) - builder.append(", ..."); - } - - builder.append(']'); - - return builder.toString(); - } - - /** - * - */ - private static class ClusterSearch { - /** */ - int srvCnt; - - /** */ - int nodeCnt; - - /** */ - final BitSet nodesBitSet; - - /** - * @param nodes Total nodes. - */ - ClusterSearch(int nodes) { - nodesBitSet = new BitSet(nodes); - } - } - - /** - * - */ - private static class ClusterGraph { - /** */ - private final static int WORD_IDX_SHIFT = 6; - - /** */ - private final IgniteLogger log; - - /** */ - private final int nodeCnt; - - /** */ - private final long[] visitBitSet; - - /** */ - private final CommunicationProblemContext ctx; - - /** */ - private final List<ClusterNode> nodes; - - /** - * @param log Logger. - * @param ctx Context. - */ - ClusterGraph(IgniteLogger log, CommunicationProblemContext ctx) { - this.log = log; - this.ctx = ctx; - - nodes = ctx.topologySnapshot(); - - nodeCnt = nodes.size(); - - assert nodeCnt > 0; - - visitBitSet = initBitSet(nodeCnt); - } - - /** - * @param bitIndex Bit index. - * @return Word index containing bit with given index. - */ - private static int wordIndex(int bitIndex) { - return bitIndex >> WORD_IDX_SHIFT; - } - - /** - * @param bitCnt Number of bits. - * @return Bit set words. - */ - static long[] initBitSet(int bitCnt) { - return new long[wordIndex(bitCnt - 1) + 1]; - } - - /** - * @return Cluster nodes bit set. - */ - ClusterSearch findLargestIndependentCluster() { - ClusterSearch maxCluster = null; - - for (int i = 0; i < nodeCnt; i++) { - if (getBit(visitBitSet, i)) - continue; - - ClusterSearch cluster = new ClusterSearch(nodeCnt); - - search(cluster, i); - - if (log.isInfoEnabled()) { - log.info("Communication problem resolver found cluster [srvCnt=" + cluster.srvCnt + - ", totalNodeCnt=" + cluster.nodeCnt + - ", nodeIds=" + clusterNodeIds(cluster.nodesBitSet, nodes, 1000) + "]"); - } - - if (maxCluster == null || cluster.srvCnt > maxCluster.srvCnt) - maxCluster = cluster; - } - - return maxCluster; - } - - /** - * @param cluster Cluster nodes bit set. - * @return {@code True} if all cluster nodes are able to connect to each other. - */ - boolean checkFullyConnected(BitSet cluster) { - int startIdx = 0; - - int clusterNodes = cluster.cardinality(); - - for (;;) { - int idx = cluster.nextSetBit(startIdx); - - if (idx == -1) - break; - - ClusterNode node1 = nodes.get(idx); - - for (int i = 0; i < clusterNodes; i++) { - if (!cluster.get(i) || i == idx) - continue; - - ClusterNode node2 = nodes.get(i); - - if (cluster.get(i) && !ctx.connectionAvailable(node1, node2)) - return false; - } - - startIdx = idx + 1; - } - - return true; - } - - /** - * @param cluster Current cluster bit set. - * @param idx Node index. - */ - void search(ClusterSearch cluster, int idx) { - assert !getBit(visitBitSet, idx); - - setBit(visitBitSet, idx); - - cluster.nodesBitSet.set(idx); - cluster.nodeCnt++; - - ClusterNode node1 = nodes.get(idx); - - if (!CU.clientNode(node1)) - cluster.srvCnt++; - - for (int i = 0; i < nodeCnt; i++) { - if (i == idx || getBit(visitBitSet, i)) - continue; - - ClusterNode node2 = nodes.get(i); - - boolean connected = ctx.connectionAvailable(node1, node2) || - ctx.connectionAvailable(node2, node1); - - if (connected) - search(cluster, i); - } - } - - /** - * @param words Bit set words. - * @param bitIndex Bit index. - */ - static void setBit(long words[], int bitIndex) { - int wordIndex = wordIndex(bitIndex); - - words[wordIndex] |= (1L << bitIndex); - } - - /** - * @param words Bit set words. - * @param bitIndex Bit index. - * @return Bit value. - */ - static boolean getBit(long[] words, int bitIndex) { - int wordIndex = wordIndex(bitIndex); - - return (words[wordIndex] & (1L << bitIndex)) != 0; - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(DefaultCommunicationProblemResolver.class, this); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/0bbf6077/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index dbf5fbb..3ee8ada 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -481,7 +481,7 @@ public class IgniteConfiguration { private ClientConnectorConfiguration cliConnCfg = ClientListenerProcessor.DFLT_CLI_CFG; /** */ - private CommunicationProblemResolver commProblemRslvr; + private CommunicationFailureResolver commFailureRslvr; /** * Creates valid grid configuration with all default values. @@ -510,7 +510,7 @@ public class IgniteConfiguration { loadBalancingSpi = cfg.getLoadBalancingSpi(); indexingSpi = cfg.getIndexingSpi(); - commProblemRslvr = cfg.getCommunicationProblemResolver(); + commFailureRslvr = cfg.getCommunicationFailureResolver(); /* * Order alphabetically for maintenance purposes. @@ -599,16 +599,16 @@ public class IgniteConfiguration { * TODO ZK * @return */ - public CommunicationProblemResolver getCommunicationProblemResolver() { - return commProblemRslvr; + public CommunicationFailureResolver getCommunicationFailureResolver() { + return commFailureRslvr; } /** * TODO ZK - * @param commProblemRslvr + * @param commFailureRslvr */ - public IgniteConfiguration setCommunicationProblemResolver(CommunicationProblemResolver commProblemRslvr) { - this.commProblemRslvr = commProblemRslvr; + public IgniteConfiguration setCommunicationFailureResolver(CommunicationFailureResolver commFailureRslvr) { + this.commFailureRslvr = commFailureRslvr; return this; } http://git-wip-us.apache.org/repos/asf/ignite/blob/0bbf6077/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 858b9f8..3556b02 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -2470,8 +2470,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { objs.add(cfg.getGridLogger()); objs.add(cfg.getMBeanServer()); - if (cfg.getCommunicationProblemResolver() != null) - objs.add(cfg.getCommunicationProblemResolver()); + if (cfg.getCommunicationFailureResolver() != null) + objs.add(cfg.getCommunicationFailureResolver()); return objs; } http://git-wip-us.apache.org/repos/asf/ignite/blob/0bbf6077/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java index df84212..b939078 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java @@ -604,11 +604,11 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan return ctx.nodeAttributes(); } - @Override public boolean communicationErrorResolveSupported() { + @Override public boolean communicationFailureResolveSupported() { return ctx.discovery().communicationErrorResolveSupported(); } - @Override public void resolveCommunicationError(ClusterNode node, Exception err) { + @Override public void resolveCommunicationFailure(ClusterNode node, Exception err) { ctx.discovery().resolveCommunicationError(node, err); } http://git-wip-us.apache.org/repos/asf/ignite/blob/0bbf6077/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 2791492..95b4c05 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -53,10 +53,10 @@ import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cluster.ClusterMetrics; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.CommunicationProblemResolver; +import org.apache.ignite.configuration.CommunicationFailureResolver; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; -import org.apache.ignite.configuration.DefaultCommunicationProblemResolver; +import org.apache.ignite.configuration.DefaultCommunicationFailureResolver; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; @@ -551,8 +551,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { }); } - if (ctx.config().getCommunicationProblemResolver() != null) - ctx.resource().injectGeneric(ctx.config().getCommunicationProblemResolver()); + if (ctx.config().getCommunicationFailureResolver() != null) + ctx.resource().injectGeneric(ctx.config().getCommunicationFailureResolver()); spi.setListener(new DiscoverySpiListener() { private long gridStartTime; @@ -2352,22 +2352,22 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @throws IgniteCheckedException If configuration is not valid. */ public static void initCommunicationErrorResolveConfiguration(IgniteConfiguration cfg) throws IgniteCheckedException { - CommunicationProblemResolver rslvr = cfg.getCommunicationProblemResolver(); + CommunicationFailureResolver rslvr = cfg.getCommunicationFailureResolver(); CommunicationSpi commSpi = cfg.getCommunicationSpi(); DiscoverySpi discoverySpi = cfg.getDiscoverySpi(); if (rslvr != null) { if (!supportsCommunicationErrorResolve(commSpi)) - throw new IgniteCheckedException("CommunicationProblemResolver is configured, but communication SPI does not support communication" + + throw new IgniteCheckedException("CommunicationFailureResolver is configured, but communication SPI does not support communication" + "problem resolve: " + commSpi.getClass().getName()); if (!supportsCommunicationErrorResolve(discoverySpi)) - throw new IgniteCheckedException("CommunicationProblemResolver is configured, but discovery SPI does not support communication" + + throw new IgniteCheckedException("CommunicationFailureResolver is configured, but discovery SPI does not support communication" + "problem resolve: " + discoverySpi.getClass().getName()); } else { if (supportsCommunicationErrorResolve(commSpi) && supportsCommunicationErrorResolve(discoverySpi)) - cfg.setCommunicationProblemResolver(new DefaultCommunicationProblemResolver()); + cfg.setCommunicationFailureResolver(new DefaultCommunicationFailureResolver()); } } @@ -2391,7 +2391,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return {@code True} if communication error resolve is supported. */ public boolean communicationErrorResolveSupported() { - return ctx.config().getCommunicationProblemResolver() != null; + return ctx.config().getCommunicationFailureResolver() != null; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/0bbf6077/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java index 50cf9fe..d63e39e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java @@ -959,12 +959,12 @@ public abstract class IgniteSpiAdapter implements IgniteSpi { } /** {@inheritDoc} */ - @Override public boolean communicationErrorResolveSupported() { + @Override public boolean communicationFailureResolveSupported() { return false; } /** {@inheritDoc} */ - @Override public void resolveCommunicationError(ClusterNode node, Exception err) { + @Override public void resolveCommunicationFailure(ClusterNode node, Exception err) { throw new UnsupportedOperationException(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/0bbf6077/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java index 01c1ea4..50f47d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java @@ -362,11 +362,11 @@ public interface IgniteSpiContext { /** * @return {@code True} if cluster supports communication error resolving. */ - public boolean communicationErrorResolveSupported(); + public boolean communicationFailureResolveSupported(); /** * @param node Problem node. * @param err Error. */ - public void resolveCommunicationError(ClusterNode node, Exception err); + public void resolveCommunicationFailure(ClusterNode node, Exception err); } http://git-wip-us.apache.org/repos/asf/ignite/blob/0bbf6077/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 566f7161..ac3ba1c 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -3416,10 +3416,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati IgniteSpiContext ctx = getSpiContext(); - if (connectionError(errs) && ctx.communicationErrorResolveSupported()) { + if (connectionError(errs) && ctx.communicationFailureResolveSupported()) { commErrResolve = true; - ctx.resolveCommunicationError(node, errs); + ctx.resolveCommunicationFailure(node, errs); } if (!commErrResolve && enableForcibleNodeKill) { http://git-wip-us.apache.org/repos/asf/ignite/blob/0bbf6077/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationFailureContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationFailureContext.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationFailureContext.java new file mode 100644 index 0000000..7dbc95d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationFailureContext.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.util.BitSet; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CommunicationFailureContext; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; + +/** + * + */ +class ZkCommunicationFailureContext implements CommunicationFailureContext { + /** */ + private static final Comparator<ClusterNode> NODE_ORDER_CMP = new Comparator<ClusterNode>() { + @Override public int compare(ClusterNode node1, ClusterNode node2) { + return Long.compare(node1.order(), node2.order()); + } + }; + + /** */ + private Set<ClusterNode> killedNodes = new HashSet<>(); + + /** */ + private final Map<UUID, BitSet> nodesState; + + /** */ + private final List<ClusterNode> initialNodes; + + /** */ + private final List<ClusterNode> curNodes; + + /** */ + private final GridCacheSharedContext ctx; + + /** + * @param ctx Context. + * @param curNodes Current topology snapshot. + * @param initialNodes Topology snapshot when communication error resolve started. + * @param nodesState Nodes communication state. + */ + ZkCommunicationFailureContext( + GridCacheSharedContext ctx, + List<ClusterNode> curNodes, + List<ClusterNode> initialNodes, + Map<UUID, BitSet> nodesState) + { + this.ctx = ctx; + this.curNodes = Collections.unmodifiableList(curNodes); + this.initialNodes = initialNodes; + this.nodesState = nodesState; + } + + /** {@inheritDoc} */ + @Override public List<ClusterNode> topologySnapshot() { + return curNodes; + } + + /** {@inheritDoc} */ + @Override public boolean connectionAvailable(ClusterNode node1, ClusterNode node2) { + BitSet nodeState = nodesState.get(node1.id()); + + if (nodeState == null) + throw new IllegalArgumentException("Invalid node: " + node1); + + int nodeIdx = Collections.binarySearch(initialNodes, node2, NODE_ORDER_CMP); + + if (nodeIdx < 0) + throw new IllegalArgumentException("Invalid node: " + node2); + + assert nodeIdx < nodeState.size() : nodeIdx; + + return nodeState.get(nodeIdx); + } + + /** {@inheritDoc} */ + @Override public List<String> startedCaches() { + return null; // TODO ZK + } + + /** {@inheritDoc} */ + @Override public List<List<ClusterNode>> cacheAffinity(String cacheName) { + return null; // TODO ZK + } + + /** {@inheritDoc} */ + @Override public List<List<ClusterNode>> cachePartitionOwners(String cacheName) { + return null; // TODO ZK + } + + /** {@inheritDoc} */ + @Override public void killNode(ClusterNode node) { + if (node == null) + throw new NullPointerException(); + + if (Collections.binarySearch(curNodes, node, NODE_ORDER_CMP) < 0) + throw new IllegalArgumentException("Invalid node: " + node); + + killedNodes.add(node); + } + + /** + * @return Nodes to fail. + */ + Set<ClusterNode> killedNodes() { + return killedNodes; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0bbf6077/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationProblemContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationProblemContext.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationProblemContext.java deleted file mode 100644 index 6159e5e..0000000 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationProblemContext.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spi.discovery.zk.internal; - -import java.util.BitSet; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.configuration.CommunicationProblemContext; -import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; - -/** - * - */ -class ZkCommunicationProblemContext implements CommunicationProblemContext { - /** */ - private static final Comparator<ClusterNode> NODE_ORDER_CMP = new Comparator<ClusterNode>() { - @Override public int compare(ClusterNode node1, ClusterNode node2) { - return Long.compare(node1.order(), node2.order()); - } - }; - - /** */ - private Set<ClusterNode> killedNodes = new HashSet<>(); - - /** */ - private final Map<UUID, BitSet> nodesState; - - /** */ - private final List<ClusterNode> initialNodes; - - /** */ - private final List<ClusterNode> curNodes; - - /** */ - private final GridCacheSharedContext ctx; - - /** - * @param ctx Context. - * @param curNodes Current topology snapshot. - * @param initialNodes Topology snapshot when communication error resolve started. - * @param nodesState Nodes communication state. - */ - ZkCommunicationProblemContext( - GridCacheSharedContext ctx, - List<ClusterNode> curNodes, - List<ClusterNode> initialNodes, - Map<UUID, BitSet> nodesState) - { - this.ctx = ctx; - this.curNodes = Collections.unmodifiableList(curNodes); - this.initialNodes = initialNodes; - this.nodesState = nodesState; - } - - /** {@inheritDoc} */ - @Override public List<ClusterNode> topologySnapshot() { - return curNodes; - } - - /** {@inheritDoc} */ - @Override public boolean connectionAvailable(ClusterNode node1, ClusterNode node2) { - BitSet nodeState = nodesState.get(node1.id()); - - if (nodeState == null) - throw new IllegalArgumentException("Invalid node: " + node1); - - int nodeIdx = Collections.binarySearch(initialNodes, node2, NODE_ORDER_CMP); - - if (nodeIdx < 0) - throw new IllegalArgumentException("Invalid node: " + node2); - - assert nodeIdx < nodeState.size() : nodeIdx; - - return nodeState.get(nodeIdx); - } - - /** {@inheritDoc} */ - @Override public List<String> startedCaches() { - return null; // TODO ZK - } - - /** {@inheritDoc} */ - @Override public List<List<ClusterNode>> cacheAffinity(String cacheName) { - return null; // TODO ZK - } - - /** {@inheritDoc} */ - @Override public List<List<ClusterNode>> cachePartitionOwners(String cacheName) { - return null; // TODO ZK - } - - /** {@inheritDoc} */ - @Override public void killNode(ClusterNode node) { - if (node == null) - throw new NullPointerException(); - - if (Collections.binarySearch(curNodes, node, NODE_ORDER_CMP) < 0) - throw new IllegalArgumentException("Invalid node: " + node); - - killedNodes.add(node); - } - - /** - * @return Nodes to fail. - */ - Set<ClusterNode> killedNodes() { - return killedNodes; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/0bbf6077/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index fb57eb0..4798444 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -47,7 +47,7 @@ import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.configuration.CommunicationProblemResolver; +import org.apache.ignite.configuration.CommunicationFailureResolver; import org.apache.ignite.events.EventType; import org.apache.ignite.internal.ClusterMetricsSnapshot; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; @@ -3143,7 +3143,7 @@ public class ZookeeperDiscoveryImpl { } } else { - CommunicationProblemResolver rslvr = spi.ignite().configuration().getCommunicationProblemResolver(); + CommunicationFailureResolver rslvr = spi.ignite().configuration().getCommunicationFailureResolver(); if (rslvr != null) { if (log.isInfoEnabled()) { @@ -3151,7 +3151,7 @@ public class ZookeeperDiscoveryImpl { ", rslvr=" + rslvr.getClass().getSimpleName() + ']'); } - ZkCommunicationProblemContext ctx = new ZkCommunicationProblemContext( + ZkCommunicationFailureContext ctx = new ZkCommunicationFailureContext( ((IgniteKernal)spi.ignite()).context().cache().context(), topSnapshot, initialNodes, http://git-wip-us.apache.org/repos/asf/ignite/blob/0bbf6077/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java index 2fa798f..8a99df0 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java @@ -52,8 +52,8 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.CommunicationProblemContext; -import org.apache.ignite.configuration.CommunicationProblemResolver; +import org.apache.ignite.configuration.CommunicationFailureContext; +import org.apache.ignite.configuration.CommunicationFailureResolver; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; @@ -184,7 +184,7 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { private boolean persistence; /** */ - private IgniteOutClosure<CommunicationProblemResolver> commProblemRslvr; + private IgniteOutClosure<CommunicationFailureResolver> commProblemRslvr; /** */ private IgniteOutClosure<DiscoverySpiNodeAuthenticator> auth; @@ -326,7 +326,7 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { cfg.setCommunicationSpi(new ZkTestCommunicationSpi()); if (commProblemRslvr != null) - cfg.setCommunicationProblemResolver(commProblemRslvr.apply()); + cfg.setCommunicationFailureResolver(commProblemRslvr.apply()); return cfg; } @@ -2176,6 +2176,8 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { cache.put(i, i); waitForTopology(4); + + ignite(3).createCache(largeCacheConfiguration("c2")); } /** @@ -2383,7 +2385,7 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { assert nodes > 1; sesTimeout = 2000; - commProblemRslvr = NoOpCommunicationProblemResolver.FACTORY; + commProblemRslvr = NoOpCommunicationFailureResolver.FACTORY; startGridsMultiThreaded(nodes); @@ -2416,7 +2418,7 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { */ public void testNoOpCommunicationErrorResolve_3() throws Exception { sesTimeout = 2000; - commProblemRslvr = NoOpCommunicationProblemResolver.FACTORY; + commProblemRslvr = NoOpCommunicationFailureResolver.FACTORY; startGridsMultiThreaded(3); @@ -2464,7 +2466,7 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { testCommSpi = true; sesTimeout = 2000; - commProblemRslvr = NoOpCommunicationProblemResolver.FACTORY; + commProblemRslvr = NoOpCommunicationFailureResolver.FACTORY; startGrid(0); @@ -2506,7 +2508,7 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { testCommSpi = true; sesTimeout = 2000; - commProblemRslvr = NoOpCommunicationProblemResolver.FACTORY; + commProblemRslvr = NoOpCommunicationFailureResolver.FACTORY; startGrid(0); @@ -2617,7 +2619,7 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { private void communicationErrorResolve_KillNodes(int startNodes, Collection<Long> killNodes) throws Exception { testCommSpi = true; - commProblemRslvr = TestNodeKillCommunicationProblemResolver.factory(killNodes); + commProblemRslvr = TestNodeKillCommunicationFailureResolver.factory(killNodes); startGrids(startNodes); @@ -2668,7 +2670,7 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { sesTimeout = 2000; testCommSpi = true; - commProblemRslvr = KillCoordinatorCommunicationProblemResolver.FACTORY; + commProblemRslvr = KillCoordinatorCommunicationFailureResolver.FACTORY; startGrids(10); @@ -2712,7 +2714,7 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { sesTimeout = 2000; testCommSpi = true; - commProblemRslvr = KillRandomCommunicationProblemResolver.FACTORY; + commProblemRslvr = KillRandomCommunicationFailureResolver.FACTORY; startGridsMultiThreaded(10); @@ -4065,16 +4067,16 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { /** * */ - static class NoOpCommunicationProblemResolver implements CommunicationProblemResolver { + static class NoOpCommunicationFailureResolver implements CommunicationFailureResolver { /** */ - static final IgniteOutClosure<CommunicationProblemResolver> FACTORY = new IgniteOutClosure<CommunicationProblemResolver>() { - @Override public CommunicationProblemResolver apply() { - return new NoOpCommunicationProblemResolver(); + static final IgniteOutClosure<CommunicationFailureResolver> FACTORY = new IgniteOutClosure<CommunicationFailureResolver>() { + @Override public CommunicationFailureResolver apply() { + return new NoOpCommunicationFailureResolver(); } }; /** {@inheritDoc} */ - @Override public void resolve(CommunicationProblemContext ctx) { + @Override public void resolve(CommunicationFailureContext ctx) { // No-op. } } @@ -4082,11 +4084,11 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { /** * */ - static class KillCoordinatorCommunicationProblemResolver implements CommunicationProblemResolver { + static class KillCoordinatorCommunicationFailureResolver implements CommunicationFailureResolver { /** */ - static final IgniteOutClosure<CommunicationProblemResolver> FACTORY = new IgniteOutClosure<CommunicationProblemResolver>() { - @Override public CommunicationProblemResolver apply() { - return new KillCoordinatorCommunicationProblemResolver(); + static final IgniteOutClosure<CommunicationFailureResolver> FACTORY = new IgniteOutClosure<CommunicationFailureResolver>() { + @Override public CommunicationFailureResolver apply() { + return new KillCoordinatorCommunicationFailureResolver(); } }; @@ -4095,7 +4097,7 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { private IgniteLogger log; /** {@inheritDoc} */ - @Override public void resolve(CommunicationProblemContext ctx) { + @Override public void resolve(CommunicationFailureContext ctx) { List<ClusterNode> nodes = ctx.topologySnapshot(); ClusterNode node = nodes.get(0); @@ -4109,11 +4111,11 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { /** * */ - static class KillRandomCommunicationProblemResolver implements CommunicationProblemResolver { + static class KillRandomCommunicationFailureResolver implements CommunicationFailureResolver { /** */ - static final IgniteOutClosure<CommunicationProblemResolver> FACTORY = new IgniteOutClosure<CommunicationProblemResolver>() { - @Override public CommunicationProblemResolver apply() { - return new KillRandomCommunicationProblemResolver(); + static final IgniteOutClosure<CommunicationFailureResolver> FACTORY = new IgniteOutClosure<CommunicationFailureResolver>() { + @Override public CommunicationFailureResolver apply() { + return new KillRandomCommunicationFailureResolver(); } }; @@ -4122,7 +4124,7 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { private IgniteLogger log; /** {@inheritDoc} */ - @Override public void resolve(CommunicationProblemContext ctx) { + @Override public void resolve(CommunicationFailureContext ctx) { List<ClusterNode> nodes = ctx.topologySnapshot(); ThreadLocalRandom rnd = ThreadLocalRandom.current(); @@ -4149,15 +4151,15 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { /** * */ - static class TestNodeKillCommunicationProblemResolver implements CommunicationProblemResolver { + static class TestNodeKillCommunicationFailureResolver implements CommunicationFailureResolver { /** * @param killOrders Killed nodes order. * @return Factory. */ - static IgniteOutClosure<CommunicationProblemResolver> factory(final Collection<Long> killOrders) { - return new IgniteOutClosure<CommunicationProblemResolver>() { - @Override public CommunicationProblemResolver apply() { - return new TestNodeKillCommunicationProblemResolver(killOrders); + static IgniteOutClosure<CommunicationFailureResolver> factory(final Collection<Long> killOrders) { + return new IgniteOutClosure<CommunicationFailureResolver>() { + @Override public CommunicationFailureResolver apply() { + return new TestNodeKillCommunicationFailureResolver(killOrders); } }; } @@ -4168,12 +4170,12 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { /** * @param killOrders Killed nodes order. */ - TestNodeKillCommunicationProblemResolver(Collection<Long> killOrders) { + TestNodeKillCommunicationFailureResolver(Collection<Long> killOrders) { this.killNodeOrders = killOrders; } /** {@inheritDoc} */ - @Override public void resolve(CommunicationProblemContext ctx) { + @Override public void resolve(CommunicationFailureContext ctx) { List<ClusterNode> nodes = ctx.topologySnapshot(); assertTrue(nodes.size() > 0); http://git-wip-us.apache.org/repos/asf/ignite/blob/0bbf6077/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java index 36403fa..3bd95bd 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java @@ -608,12 +608,12 @@ public class GridSpiTestContext implements IgniteSpiContext { } /** {@inheritDoc} */ - @Override public boolean communicationErrorResolveSupported() { + @Override public boolean communicationFailureResolveSupported() { return false; } /** {@inheritDoc} */ - @Override public void resolveCommunicationError(ClusterNode node, Exception err) { + @Override public void resolveCommunicationFailure(ClusterNode node, Exception err) { throw new UnsupportedOperationException(); }