http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java new file mode 100644 index 0000000..5645791 --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Callable; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.data.Stat; + +/** + * + */ +class ZkDistributedCollectDataFuture extends GridFutureAdapter<Void> { + /** */ + private final IgniteLogger log; + + /** */ + private final String futPath; + + /** */ + private final Set<Long> remainingNodes; + + /** */ + private final Callable<Void> lsnr; + + /** + * @param impl Disovery impl + * @param rtState Runtime state. + * @param futPath Future path. + * @param lsnr Future listener. + * @throws Exception If listener call failed. + */ + ZkDistributedCollectDataFuture( + ZookeeperDiscoveryImpl impl, + ZkRuntimeState rtState, + String futPath, + Callable<Void> lsnr) + throws Exception + { + this.log = impl.log(); + this.futPath = futPath; + this.lsnr = lsnr; + + ZkClusterNodes top = rtState.top; + + // Assume new nodes can not join while future is in progress. + + remainingNodes = U.newHashSet(top.nodesByOrder.size()); + + for (ZookeeperClusterNode node : top.nodesByInternalId.values()) + remainingNodes.add(node.order()); + + NodeResultsWatcher watcher = new NodeResultsWatcher(rtState, impl); + + if (remainingNodes.isEmpty()) + completeAndNotifyListener(); + else { + if (log.isInfoEnabled()) { + log.info("Initialize data collect future [futPath=" + futPath + ", " + + "remainingNodes=" + remainingNodes.size() + ']'); + } + + rtState.zkClient.getChildrenAsync(futPath, watcher, watcher); + } + } + + /** + * @throws Exception If listener call failed. + */ + private void completeAndNotifyListener() throws Exception { + if (super.onDone()) + lsnr.call(); + } + + /** + * @param futPath + * @param client + * @param nodeOrder + * @param data + * @throws Exception If failed. + */ + static void saveNodeResult(String futPath, ZookeeperClient client, long nodeOrder, byte[] data) throws Exception { + client.createIfNeeded(futPath + "/" + nodeOrder, data, CreateMode.PERSISTENT); + } + + /** + * @param futPath + * @param client + * @param nodeOrder + * @return Node result data. + * @throws Exception If fai.ed + */ + static byte[] readNodeResult(String futPath, ZookeeperClient client, long nodeOrder) throws Exception { + return client.getData(futPath + "/" + nodeOrder); + } + + /** + * @param futResPath Result path. + * @param client Client. + * @param data Result data. + * @throws Exception If failed. + */ + static void saveResult(String futResPath, ZookeeperClient client, byte[] data) throws Exception { + client.createIfNeeded(futResPath, data, CreateMode.PERSISTENT); + } + + static byte[] readResult(ZookeeperClient client, ZkIgnitePaths paths, UUID futId) throws Exception { + return client.getData(paths.distributedFutureResultPath(futId)); + } + + /** + * @param client Client. + * @param paths Paths utils. + * @param futId Future ID. + * @throws Exception If failed. + */ + static void deleteFutureData(ZookeeperClient client, ZkIgnitePaths paths, UUID futId) throws Exception { + // TODO ZK: use multi, better batching + max-size safe + NoNodeException safe. + String evtDir = paths.distributedFutureBasePath(futId); + + try { + client.deleteAll(evtDir, + client.getChildren(evtDir), + -1); + } + catch (KeeperException.NoNodeException e) { + // TODO ZK + } + + client.deleteIfExists(evtDir, -1); + + client.deleteIfExists(paths.distributedFutureResultPath(futId), -1); + } + + /** + * @param top Current topology. + * @throws Exception If listener call failed. + */ + void onTopologyChange(ZkClusterNodes top) throws Exception { + if (remainingNodes.isEmpty()) + return; + + for (Iterator<Long> it = remainingNodes.iterator(); it.hasNext();) { + Long nodeOrder = it.next(); + + if (!top.nodesByOrder.containsKey(nodeOrder)) { + it.remove(); + + int remaining = remainingNodes.size(); + + if (log.isInfoEnabled()) { + log.info("ZkDistributedCollectDataFuture removed remaining failed node [node=" + nodeOrder + + ", remaining=" + remaining + + ", futPath=" + futPath + ']'); + } + + if (remaining == 0) { + completeAndNotifyListener(); + + break; + } + } + } + } + + /** + * + */ + class NodeResultsWatcher extends ZkAbstractWatcher implements AsyncCallback.Children2Callback { + /** + * @param rtState Runtime state. + * @param impl Discovery impl. + */ + NodeResultsWatcher(ZkRuntimeState rtState, ZookeeperDiscoveryImpl impl) { + super(rtState, impl); + } + + /** {@inheritDoc} */ + @Override protected void process0(WatchedEvent evt) { + if (evt.getType() == Watcher.Event.EventType.NodeChildrenChanged) + rtState.zkClient.getChildrenAsync(evt.getPath(), this, this); + } + + /** {@inheritDoc} */ + @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { + if (!onProcessStart()) + return; + + try { + if (!isDone()) { + assert rc == 0 : KeeperException.Code.get(rc); + + for (int i = 0; i < children.size(); i++) { + Long nodeOrder = Long.parseLong(children.get(i)); + + if (remainingNodes.remove(nodeOrder)) { + int remaining = remainingNodes.size(); + + if (log.isInfoEnabled()) { + log.info("ZkDistributedCollectDataFuture added new result [node=" + nodeOrder + + ", remaining=" + remaining + + ", futPath=" + path + ']'); + } + + if (remaining == 0) + completeAndNotifyListener(); + } + } + } + + onProcessEnd(); + } + catch (Throwable e) { + onProcessError(e); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java new file mode 100644 index 0000000..9d8980e --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.util.UUID; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class ZkForceNodeFailMessage implements DiscoverySpiCustomMessage, ZkInternalMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + final long nodeInternalId; + + /** */ + final String warning; + + /** + * @param nodeInternalId Node ID. + * @param warning Warning to be displayed on all nodes. + */ + ZkForceNodeFailMessage(long nodeInternalId, String warning) { + this.nodeInternalId = nodeInternalId; + this.warning = warning; + } + + /** {@inheritDoc} */ + @Nullable @Override public DiscoverySpiCustomMessage ackMessage() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean isMutable() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean stopProcess() { + return false; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ZkForceNodeFailMessage.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java new file mode 100644 index 0000000..642183b --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java @@ -0,0 +1,378 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.util.UUID; + +/** + * + */ +class ZkIgnitePaths { + /** */ + static final String PATH_SEPARATOR = "/"; + + /** */ + private static final byte CLIENT_NODE_FLAG_MASK = 0x01; + + /** */ + private static final int UUID_LEN = 36; + + /** Directory to store joined node data. */ + private static final String JOIN_DATA_DIR = "jd"; + + /** Directory to store new custom events. */ + private static final String CUSTOM_EVTS_DIR = "ce"; + + /** Directory to store parts of multi-parts custom events. */ + private static final String CUSTOM_EVTS_PARTS_DIR = "cp"; + + /** Directory to store acknowledge messages for custom events. */ + private static final String CUSTOM_EVTS_ACKS_DIR = "ca"; + + /** Directory to store EPHEMERAL znodes for alive cluster nodes. */ + static final String ALIVE_NODES_DIR = "n"; + + /** Path to store discovery events {@link ZkDiscoveryEventsData}. */ + private static final String DISCO_EVENTS_PATH = "e"; + + /** */ + final String clusterDir; + + /** */ + final String aliveNodesDir; + + /** */ + final String joinDataDir; + + /** */ + final String evtsPath; + + /** */ + final String customEvtsDir; + + /** */ + final String customEvtsPartsDir; + + /** */ + final String customEvtsAcksDir; + + /** + * @param zkRootPath Base Zookeeper directory for all Ignite nodes. + */ + ZkIgnitePaths(String zkRootPath) { + clusterDir = zkRootPath; + + aliveNodesDir = zkPath(ALIVE_NODES_DIR); + joinDataDir = zkPath(JOIN_DATA_DIR); + evtsPath = zkPath(DISCO_EVENTS_PATH); + customEvtsDir = zkPath(CUSTOM_EVTS_DIR); + customEvtsPartsDir = zkPath(CUSTOM_EVTS_PARTS_DIR); + customEvtsAcksDir = zkPath(CUSTOM_EVTS_ACKS_DIR); + } + + /** + * TODO ZK: copied from curator. + * + * Validate the provided znode path string + * @param path znode path string + * @return The given path if it was valid, for fluent chaining + * @throws IllegalArgumentException if the path is invalid + */ + static String validatePath(String path) throws IllegalArgumentException { + if (path == null) { + throw new IllegalArgumentException("Path cannot be null"); + } + if (path.length() == 0) { + throw new IllegalArgumentException("Path length must be > 0"); + } + if (path.charAt(0) != '/') { + throw new IllegalArgumentException( + "Path must start with / character"); + } + if (path.length() == 1) { // done checking - it's the root + return path; + } + if (path.charAt(path.length() - 1) == '/') { + throw new IllegalArgumentException( + "Path must not end with / character"); + } + + String reason = null; + char lastc = '/'; + char chars[] = path.toCharArray(); + char c; + for (int i = 1; i < chars.length; lastc = chars[i], i++) { + c = chars[i]; + + if (c == 0) { + reason = "null character not allowed @" + i; + break; + } else if (c == '/' && lastc == '/') { + reason = "empty node name specified @" + i; + break; + } else if (c == '.' && lastc == '.') { + if (chars[i-2] == '/' && + ((i + 1 == chars.length) + || chars[i+1] == '/')) { + reason = "relative paths not allowed @" + i; + break; + } + } else if (c == '.') { + if (chars[i-1] == '/' && + ((i + 1 == chars.length) + || chars[i+1] == '/')) { + reason = "relative paths not allowed @" + i; + break; + } + } else if (c > '\u0000' && c < '\u001f' + || c > '\u007f' && c < '\u009F' + || c > '\ud800' && c < '\uf8ff' + || c > '\ufff0' && c < '\uffff') { + reason = "invalid charater @" + i; + break; + } + } + + if (reason != null) { + throw new IllegalArgumentException( + "Invalid path string \"" + path + "\" caused by " + reason); + } + + return path; + } + + /** + * @param path Relative path. + * @return Full path. + */ + private String zkPath(String path) { + return clusterDir + "/" + path; + } + + /** + * @param nodeId Node ID. + * @param prefixId Unique prefix ID. + * @return Path. + */ + String joiningNodeDataPath(UUID nodeId, UUID prefixId) { + return joinDataDir + '/' + prefixId + ":" + nodeId.toString(); + } + + /** + * @param path Alive node zk path. + * @return Node internal ID. + */ + static long aliveInternalId(String path) { + int idx = path.lastIndexOf('|'); + + return Integer.parseInt(path.substring(idx + 1)); + } + + /** + * @param prefix Node unique path prefix. + * @param node Node. + * @return Path. + */ + String aliveNodePathForCreate(String prefix, ZookeeperClusterNode node) { + byte flags = 0; + + if (node.isClient()) + flags |= CLIENT_NODE_FLAG_MASK; + + return aliveNodesDir + "/" + prefix + ":" + node.id() + ":" + encodeFlags(flags) + "|"; + } + + /** + * @param path Alive node zk path. + * @return {@code True} if node is client. + */ + static boolean aliveNodeClientFlag(String path) { + return (aliveFlags(path) & CLIENT_NODE_FLAG_MASK) != 0; + } + + /** + * @param path Alive node zk path. + * @return Node ID. + */ + static UUID aliveNodePrefixId(String path) { + return UUID.fromString(path.substring(0, ZkIgnitePaths.UUID_LEN)); + } + + /** + * @param path Alive node zk path. + * @return Node ID. + */ + static UUID aliveNodeId(String path) { + // <uuid prefix>:<node id>:<flags>|<alive seq> + int startIdx = ZkIgnitePaths.UUID_LEN + 1; + + String idStr = path.substring(startIdx, startIdx + ZkIgnitePaths.UUID_LEN); + + return UUID.fromString(idStr); + } + + /** + * @param path Event zk path. + * @return Event sequence number. + */ + static int customEventSequence(String path) { + int idx = path.lastIndexOf('|'); + + return Integer.parseInt(path.substring(idx + 1)); + } + + /** + * @param path Custom event zl path. + * @return Event node ID. + */ + static UUID customEventSendNodeId(String path) { + // <uuid prefix>:<node id>:<partCnt>|<seq> + int startIdx = ZkIgnitePaths.UUID_LEN + 1; + + String idStr = path.substring(startIdx, startIdx + ZkIgnitePaths.UUID_LEN); + + return UUID.fromString(idStr); + } + + /** + * @param path Event path. + * @return Event unique prefix. + */ + static String customEventPrefix(String path) { + // <uuid prefix>:<node id>:<partCnt>|<seq> + + return path.substring(0, ZkIgnitePaths.UUID_LEN); + } + + /** + * @param path Custom event zl path. + * @return Event node ID. + */ + static int customEventPartsCount(String path) { + // <uuid prefix>:<node id>:<partCnt>|<seq> + int startIdx = 2 * ZkIgnitePaths.UUID_LEN + 2; + + String cntStr = path.substring(startIdx, startIdx + 4); + + int partCnt = Integer.parseInt(cntStr); + + assert partCnt >= 1 : partCnt; + + return partCnt; + } + + /** + * @param prefix Prefix. + * @param nodeId Node ID. + * @param partCnt Parts count. + * @return Path. + */ + String createCustomEventPath(String prefix, UUID nodeId, int partCnt) { + return customEvtsDir + "/" + prefix + ":" + nodeId + ":" + String.format("%04d", partCnt) + '|'; + } + + /** + * @param prefix Prefix. + * @param nodeId Node ID. + * @return Path. + */ + String customEventPartsBasePath(String prefix, UUID nodeId) { + return customEvtsPartsDir + "/" + prefix + ":" + nodeId + ":"; + } + + /** + * @param prefix Prefix. + * @param nodeId Node ID. + * @param part Part number. + * @return Path. + */ + String customEventPartPath(String prefix, UUID nodeId, int part) { + return customEventPartsBasePath(prefix, nodeId) + String.format("%04d", part); + } + + /** + * @param evtId Event ID. + * @return Event zk path. + */ + String joinEventDataPathForJoined(long evtId) { + return evtsPath + "/fj-" + evtId; + } + + /** + * @param topVer Event topology version. + * @return Event zk path. + */ + String joinEventSecuritySubjectPath(long topVer) { + return evtsPath + "/s-" + topVer; + } + + /** + * @param origEvtId ID of original custom event. + * @return Path for custom event ack. + */ + String ackEventDataPath(long origEvtId) { + assert origEvtId != 0; + + return customEvtsAcksDir + "/" + String.valueOf(origEvtId); + } + + /** + * @param id Future ID. + * @return Future path. + */ + String distributedFutureBasePath(UUID id) { + return evtsPath + "/f-" + id; + } + + /** + * @param id Future ID. + * @return Future path. + */ + String distributedFutureResultPath(UUID id) { + return evtsPath + "/fr-" + id; + } + + /** + * @param flags Flags. + * @return Flags string. + */ + private static String encodeFlags(byte flags) { + int intVal = flags + 128; + + String str = Integer.toString(intVal, 16); + + if (str.length() == 1) + str = '0' + str; + + assert str.length() == 2 : str; + + return str; + } + + /** + * @param path Alive node zk path. + * @return Flags. + */ + private static byte aliveFlags(String path) { + int startIdx = path.lastIndexOf(':') + 1; + + String flagsStr = path.substring(startIdx, startIdx + 2); + + return (byte)(Integer.parseInt(flagsStr, 16) - 128); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java new file mode 100644 index 0000000..a73312c --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +/** + * + */ +class ZkInternalJoinErrorMessage implements ZkInternalMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + transient boolean notifyNode = true; + + /** */ + final long nodeInternalId; + + /** */ + final String err; + + /** + * @param nodeInternalId Joining node internal ID. + * @param err Error message. + */ + ZkInternalJoinErrorMessage(long nodeInternalId, String err) { + this.nodeInternalId = nodeInternalId; + this.err = err; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java new file mode 100644 index 0000000..c1d56f0 --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.io.Serializable; + +/** + * + */ +interface ZkInternalMessage extends Serializable { + // No-op. +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java new file mode 100644 index 0000000..e4ae4ba0 --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +class ZkJoinEventDataForJoined implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final List<ZookeeperClusterNode> top; + + /** */ + private final Map<Long, byte[]> discoData; + + /** */ + private final Map<Long, Long> dupDiscoData; + + /** + * @param top Topology. + * @param discoData Discovery data. + */ + ZkJoinEventDataForJoined(List<ZookeeperClusterNode> top, Map<Long, byte[]> discoData, @Nullable Map<Long, Long> dupDiscoData) { + assert top != null; + assert discoData != null && !discoData.isEmpty(); + + this.top = top; + this.discoData = discoData; + this.dupDiscoData = dupDiscoData; + } + + byte[] discoveryDataForNode(long nodeOrder) { + assert discoData != null; + + byte[] dataBytes = discoData.get(nodeOrder); + + if (dataBytes != null) + return dataBytes; + + assert dupDiscoData != null; + + Long dupDataNode = dupDiscoData.get(nodeOrder); + + assert dupDataNode != null; + + dataBytes = discoData.get(dupDataNode); + + assert dataBytes != null; + + return dataBytes; + } + + /** + * @return Current topology. + */ + List<ZookeeperClusterNode> topology() { + assert top != null; + + return top; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinedNodeEvtData.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinedNodeEvtData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinedNodeEvtData.java new file mode 100644 index 0000000..8149afc --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinedNodeEvtData.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.io.Serializable; +import java.util.UUID; + +/** + * + */ +public class ZkJoinedNodeEvtData implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + final long topVer; + + /** */ + final long joinedInternalId; + + /** */ + final UUID nodeId; + + /** */ + final int joinDataPartCnt; + + /** */ + final int secSubjPartCnt; + + /** */ + final UUID joinDataPrefixId; + + /** */ + transient ZkJoiningNodeData joiningNodeData; + + /** + * @param topVer Topology version for node join event. + * @param nodeId Joined node ID. + * @param joinedInternalId Joined node internal ID. + * @param joinDataPrefixId Join data unique prefix. + * @param joinDataPartCnt Join data part count. + * @param secSubjPartCnt Security subject part count. + */ + ZkJoinedNodeEvtData( + long topVer, + UUID nodeId, + long joinedInternalId, + UUID joinDataPrefixId, + int joinDataPartCnt, + int secSubjPartCnt) + { + this.topVer = topVer; + this.nodeId = nodeId; + this.joinedInternalId = joinedInternalId; + this.joinDataPrefixId = joinDataPrefixId; + this.joinDataPartCnt = joinDataPartCnt; + this.secSubjPartCnt = secSubjPartCnt; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "ZkJoinedNodeData [id=" + nodeId + ", order=" + topVer + ']'; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java new file mode 100644 index 0000000..ff8311d --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.io.Serializable; +import java.util.Map; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * + */ +class ZkJoiningNodeData implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private int partCnt; + + /** */ + @GridToStringInclude + private ZookeeperClusterNode node; + + /** */ + @GridToStringInclude + private Map<Integer, Serializable> discoData; + + /** + * @param partCnt Number of parts in multi-parts message. + */ + ZkJoiningNodeData(int partCnt) { + this.partCnt = partCnt; + } + + /** + * @param node Node. + * @param discoData Discovery data. + */ + ZkJoiningNodeData(ZookeeperClusterNode node, Map<Integer, Serializable> discoData) { + assert node != null && node.id() != null : node; + assert discoData != null; + + this.node = node; + this.discoData = discoData; + } + + /** + * @return Number of parts in multi-parts message. + */ + int partCount() { + return partCnt; + } + + /** + * @return Node. + */ + ZookeeperClusterNode node() { + return node; + } + + /** + * @return Discovery data. + */ + Map<Integer, Serializable> discoveryData() { + return discoData; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ZkJoiningNodeData.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java new file mode 100644 index 0000000..626fe74 --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +class ZkNoServersMessage implements DiscoverySpiCustomMessage, ZkInternalMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Nullable @Override public DiscoverySpiCustomMessage ackMessage() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean isMutable() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean stopProcess() { + return false; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ZkNoServersMessage.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNodeValidateResult.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNodeValidateResult.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNodeValidateResult.java new file mode 100644 index 0000000..2abfee3 --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNodeValidateResult.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +/** + * + */ +class ZkNodeValidateResult { + /** */ + String err; + + /** */ + byte[] secSubjZipBytes; + + /** + * @param err Error. + */ + ZkNodeValidateResult(String err) { + this.err = err; + } + + /** + * @param secSubjZipBytes Marshalled security subject. + */ + ZkNodeValidateResult(byte[] secSubjZipBytes) { + this.secSubjZipBytes = secSubjZipBytes; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRunnable.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRunnable.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRunnable.java new file mode 100644 index 0000000..fb6cf89 --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRunnable.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +/** + * + */ +public abstract class ZkRunnable extends ZkAbstractCallabck implements Runnable { + /** + * @param rtState Runtime state. + * @param impl Discovery impl. + */ + ZkRunnable(ZkRuntimeState rtState, ZookeeperDiscoveryImpl impl) { + super(rtState, impl); + } + + /** {@inheritDoc} */ + @Override public void run() { + if (!onProcessStart()) + return; + + try { + run0(); + + onProcessEnd(); + } + catch (Throwable e) { + onProcessError(e); + } + } + + /** + * + */ + protected abstract void run0() throws Exception; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java new file mode 100644 index 0000000..6792154 --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.util.List; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.spi.IgniteSpiTimeoutObject; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.Watcher; + +/** + * + */ +class ZkRuntimeState { + /** */ + ZkWatcher watcher; + + /** */ + ZkAliveNodeDataWatcher aliveNodeDataWatcher; + + /** */ + volatile Exception errForClose; + + /** */ + final boolean prevJoined; + + /** */ + ZookeeperClient zkClient; + + /** */ + long internalOrder; + + /** */ + int joinDataPartCnt; + + /** */ + long gridStartTime; + + /** */ + volatile boolean joined; + + /** */ + ZkDiscoveryEventsData evtsData; + + /** */ + boolean crd; + + /** */ + String locNodeZkPath; + + /** */ + final ZkAliveNodeData locNodeInfo = new ZkAliveNodeData(); + + /** */ + int procEvtCnt; + + /** */ + final ZkClusterNodes top = new ZkClusterNodes(); + + /** */ + List<ClusterNode> commErrProcNodes; + + /** Timeout callback registering watcher for join error + * (set this watcher after timeout as a minor optimization). + */ + ZkTimeoutObject joinErrTo; + + /** Timeout callback set to wait for join timeout. */ + ZkTimeoutObject joinTo; + + /** Timeout callback to update processed events counter. */ + ZkTimeoutObject procEvtsUpdateTo; + + /** + * @param prevJoined {@code True} if joined topology before reconnect attempt. + */ + ZkRuntimeState(boolean prevJoined) { + this.prevJoined = prevJoined; + } + + /** + * @param watcher Watcher. + * @param aliveNodeDataWatcher Alive nodes data watcher. + */ + void init(ZkWatcher watcher, ZkAliveNodeDataWatcher aliveNodeDataWatcher) { + this.watcher = watcher; + this.aliveNodeDataWatcher = aliveNodeDataWatcher; + } + + /** + * @param err Error. + */ + void onCloseStart(Exception err) { + assert err != null; + + errForClose = err; + + ZookeeperClient zkClient = this.zkClient; + + if (zkClient != null) + zkClient.onCloseStart(); + } + + /** + * + */ + interface ZkWatcher extends Watcher, AsyncCallback.Children2Callback, AsyncCallback.DataCallback { + // No-op. + } + + /** + * + */ + interface ZkAliveNodeDataWatcher extends Watcher, AsyncCallback.DataCallback { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkTimeoutObject.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkTimeoutObject.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkTimeoutObject.java new file mode 100644 index 0000000..4d3d5b4 --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkTimeoutObject.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.spi.IgniteSpiTimeoutObject; + +/** + * + */ +abstract class ZkTimeoutObject implements IgniteSpiTimeoutObject { + /** */ + private final IgniteUuid id = IgniteUuid.randomUuid(); + + /** */ + private final long endTime; + + /** */ + volatile boolean cancelled; + + /** + * @param timeout Timeout. + */ + ZkTimeoutObject(long timeout) { + long endTime = timeout >= 0 ? System.currentTimeMillis() + timeout : Long.MAX_VALUE; + + this.endTime = endTime >= 0 ? endTime : Long.MAX_VALUE; + } + + /** {@inheritDoc} */ + @Override public final IgniteUuid id() { + return id; + } + + /** {@inheritDoc} */ + @Override public final long endTime() { + return endTime; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java new file mode 100644 index 0000000..786d997 --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java @@ -0,0 +1,1196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.List; +import java.util.Timer; +import java.util.TimerTask; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteRunnable; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Op; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import org.jetbrains.annotations.Nullable; + +/** + * TODO ZK: limit reconnect attempts. + */ +public class ZookeeperClient implements Watcher { + /** */ + private static final long RETRY_TIMEOUT = + IgniteSystemProperties.getLong("IGNITE_ZOOKEEPER_DISCOVERY_RETRY_TIMEOUT", 1000); + + /** */ + private static final int MAX_REQ_SIZE = 1048528; + + /** */ + private static final List<ACL> ZK_ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE; + + /** */ + private static final byte[] EMPTY_BYTES = {}; + + /** */ + private final ZooKeeper zk; + + /** */ + private final IgniteLogger log; + + /** */ + private ConnectionState state = ConnectionState.Disconnected; + + /** */ + private long connLossTimeout; + + /** */ + private volatile long connStartTime; + + /** */ + private final Object stateMux = new Object(); + + /** */ + private final IgniteRunnable connLostC; + + /** */ + private final Timer connTimer; + + /** */ + private final ArrayDeque<ZkAsyncOperation> retryQ = new ArrayDeque<>(); + + /** */ + private volatile boolean closing; + + /** + * @param log Logger. + * @param connectString ZK connection string. + * @param sesTimeout ZK session timeout. + * @param connLostC Lost connection callback. + * @throws Exception If failed. + */ + ZookeeperClient(IgniteLogger log, String connectString, int sesTimeout, IgniteRunnable connLostC) throws Exception { + this(null, log, connectString, sesTimeout, connLostC); + } + + /** + * @param igniteInstanceName Ignite instance name. + * @param log Logger. + * @param connectString ZK connection string. + * @param sesTimeout ZK session timeout. + * @param connLostC Lost connection callback. + * @throws Exception If failed. + */ + ZookeeperClient(String igniteInstanceName, + IgniteLogger log, + String connectString, + int sesTimeout, + IgniteRunnable connLostC) + throws Exception + { + this.log = log.getLogger(getClass()); + this.connLostC = connLostC; + + connLossTimeout = sesTimeout; + + long connStartTime = this.connStartTime = System.currentTimeMillis(); + + connTimer = new Timer("zk-client-timer-" + igniteInstanceName); + + String threadName = Thread.currentThread().getName(); + + // ZK generates internal threads' names using current thread name. + Thread.currentThread().setName("zk-" + igniteInstanceName); + + try { + zk = new ZooKeeper(connectString, sesTimeout, this); + } + finally { + Thread.currentThread().setName(threadName); + } + + synchronized (stateMux) { + if (connStartTime == this.connStartTime && state == ConnectionState.Disconnected) + scheduleConnectionCheck(); + } + } + + /** + * @return Zookeeper client. + */ + ZooKeeper zk() { + return zk; + } + + /** + * @return {@code True} if connected to ZooKeeper. + */ + boolean connected() { + synchronized (stateMux) { + return state == ConnectionState.Connected; + } + } + + /** {@inheritDoc} */ + @Override public void process(WatchedEvent evt) { + if (closing) + return; + + if (evt.getType() == Event.EventType.None) { + ConnectionState newState; + + synchronized (stateMux) { + if (state == ConnectionState.Lost) { + U.warn(log, "Received event after connection was lost [evtState=" + evt.getState() + "]"); + + return; + } + + if (!zk.getState().isAlive()) + return; + + Event.KeeperState zkState = evt.getState(); + + switch (zkState) { + case SaslAuthenticated: + return; // No-op. + + case AuthFailed: + newState = state; + + break; + + case Disconnected: + newState = ConnectionState.Disconnected; + + break; + + case SyncConnected: + newState = ConnectionState.Connected; + + break; + + case Expired: + newState = ConnectionState.Lost; + + break; + + default: + U.error(log, "Unexpected state for ZooKeeper client, close connection: " + zkState); + + newState = ConnectionState.Lost; + } + + if (newState != state) { + if (log.isInfoEnabled()) + log.info("ZooKeeper client state changed [prevState=" + state + ", newState=" + newState + ']'); + + state = newState; + + if (newState == ConnectionState.Disconnected) { + connStartTime = System.currentTimeMillis(); + + scheduleConnectionCheck(); + } + else if (newState == ConnectionState.Connected) + stateMux.notifyAll(); + else + assert state == ConnectionState.Lost : state; + } + else + return; + } + + if (newState == ConnectionState.Lost) { + closeClient(); + + notifyConnectionLost(); + } + else if (newState == ConnectionState.Connected) { + for (ZkAsyncOperation op : retryQ) + op.execute(); + } + } + } + + /** + * + */ + private void notifyConnectionLost() { + if (!closing && state == ConnectionState.Lost && connLostC != null) + connLostC.run(); + } + + /** + * @param path Path. + * @return {@code True} if node exists. + * @throws ZookeeperClientFailedException If connection to zk was lost. + * @throws InterruptedException If interrupted. + */ + boolean exists(String path) throws ZookeeperClientFailedException, InterruptedException { + for (;;) { + long connStartTime = this.connStartTime; + + try { + return zk.exists(path, false) != null; + } + catch (Exception e) { + onZookeeperError(connStartTime, e); + } + } + } + + /** + * + * @param paths Paths to create. + * @param createMode Create mode. + * @throws KeeperException.NodeExistsException If at least one of target node already exists. + * @throws ZookeeperClientFailedException If connection to zk was lost. + * @throws InterruptedException If interrupted. + */ + void createAll(List<String> paths, CreateMode createMode) + throws ZookeeperClientFailedException, InterruptedException, KeeperException.NodeExistsException + { + // TODO ZK: need check for max size? + List<Op> ops = new ArrayList<>(paths.size()); + + for (String path : paths) + ops.add(Op.create(path, EMPTY_BYTES, ZK_ACL, createMode)); + + for (;;) { + long connStartTime = this.connStartTime; + + try { + zk.multi(ops); + + return; + } + catch (KeeperException.NodeExistsException e) { + throw e; + } + catch (Exception e) { + onZookeeperError(connStartTime, e); + } + } + + } + + /** + * @param path Path. + * @param data Data. + * @param overhead Extra overhead. + * @return {@code True} If data size exceeds max request size and should be splitted into multiple parts. + */ + boolean needSplitNodeData(String path, byte[] data, int overhead) { + return requestOverhead(path) + data.length + overhead > MAX_REQ_SIZE; + } + + /** + * @param path Path. + * @param data Data. + * @param overhead Extra overhead. + * @return Splitted data. + */ + List<byte[]> splitNodeData(String path, byte[] data, int overhead) { + int partSize = MAX_REQ_SIZE - requestOverhead(path) - overhead; + + int partCnt = data.length / partSize; + + if (data.length % partSize != 0) + partCnt++; + + assert partCnt > 1 : "Do not need split"; + + List<byte[]> parts = new ArrayList<>(partCnt); + + int remaining = data.length; + + for (int i = 0; i < partCnt; i++) { + int partSize0 = Math.min(remaining, partSize); + + byte[] part = new byte[partSize0]; + + System.arraycopy(data, i * partSize, part, 0, part.length); + + remaining -= partSize0; + + parts.add(part); + } + + assert remaining == 0 : remaining; + + return parts; + } + + /** + * TODO ZK: it seems not always precise, e.g. if ACL is used? + * @param path Request path. + * @return Marshalled request overhead. + */ + private int requestOverhead(String path) { + return path.length(); + } + + /** + * @param path Path. + * @param data Data. + * @param createMode Create mode. + * @return Created path. + * @throws ZookeeperClientFailedException If connection to zk was lost. + * @throws InterruptedException If interrupted. + */ + String createIfNeeded(String path, byte[] data, CreateMode createMode) + throws ZookeeperClientFailedException, InterruptedException + { + assert !createMode.isSequential() : createMode; + + if (data == null) + data = EMPTY_BYTES; + + for (;;) { + long connStartTime = this.connStartTime; + + try { + return zk.create(path, data, ZK_ACL, createMode); + } + catch (KeeperException.NodeExistsException e) { + if (log.isDebugEnabled()) + log.debug("Node already exists: " + path); + + return path; + } + catch (Exception e) { + onZookeeperError(connStartTime, e); + } + } + } + + /** + * @param checkPrefix Unique prefix to check in case of retry. + * @param parentPath Parent node path. + * @param path Node to create. + * @param data Node data. + * @param createMode Create mode. + * @return Create path. + * @throws ZookeeperClientFailedException If connection to zk was lost. + * @throws InterruptedException If interrupted. + */ + String createSequential(String checkPrefix, String parentPath, String path, byte[] data, CreateMode createMode) + throws ZookeeperClientFailedException, InterruptedException + { + assert createMode.isSequential() : createMode; + + if (data == null) + data = EMPTY_BYTES; + + boolean first = true; + + for (;;) { + long connStartTime = this.connStartTime; + + try { + if (!first) { + List<String> children = zk.getChildren(parentPath, false); + + for (int i = 0; i < children.size(); i++) { + String child = children.get(i); + + if (children.get(i).startsWith(checkPrefix)) { + String resPath = parentPath + "/" + child; + + if (log.isDebugEnabled()) + log.debug("Check before retry, node already created: " + resPath); + + return resPath; + } + } + } + + return zk.create(path, data, ZK_ACL, createMode); + } + catch (KeeperException.NodeExistsException e) { + assert !createMode.isSequential() : createMode; + + if (log.isDebugEnabled()) + log.debug("Node already exists: " + path); + + return path; + } + catch (Exception e) { + onZookeeperError(connStartTime, e); + } + + first = false; + } + } + + /** + * @param path Path. + * @return Children nodes. + * @throws ZookeeperClientFailedException If connection to zk was lost. + * @throws InterruptedException If interrupted. + */ + List<String> getChildren(String path) + throws ZookeeperClientFailedException, InterruptedException + { + for (;;) { + long connStartTime = this.connStartTime; + + try { + return zk.getChildren(path, false); + } + catch (Exception e) { + onZookeeperError(connStartTime, e); + } + } + } + + /** + * @param path Path. + * @throws InterruptedException If interrupted. + * @throws KeeperException In case of error. + * @return {@code True} if given path exists. + */ + boolean existsNoRetry(String path) throws InterruptedException, KeeperException { + return zk.exists(path, false) != null; + } + + /** + * @param path Path. + * @param ver Expected version. + * @throws InterruptedException If interrupted. + * @throws KeeperException In case of error. + */ + void deleteIfExistsNoRetry(String path, int ver) throws InterruptedException, KeeperException { + try { + zk.delete(path, ver); + } + catch (KeeperException.NoNodeException e) { + // No-op if znode does not exist. + } + } + + /** + * @param path Path. + * @param ver Version. + * @throws ZookeeperClientFailedException If connection to zk was lost. + * @throws InterruptedException If interrupted. + */ + void deleteIfExists(String path, int ver) + throws ZookeeperClientFailedException, InterruptedException + { + try { + delete(path, ver); + } + catch (KeeperException.NoNodeException e) { + // No-op if znode does not exist. + } + } + + /** + * @param parent Parent path. + * @param paths Children paths. + * @param ver Version. + * @throws KeeperException.NoNodeException If at least one of nodes does not exist. + * @throws ZookeeperClientFailedException If connection to zk was lost. + * @throws InterruptedException If interrupted. + */ + void deleteAll(@Nullable String parent, List<String> paths, int ver) + throws KeeperException.NoNodeException, ZookeeperClientFailedException, InterruptedException + { + if (paths.isEmpty()) + return; + + // TODO ZK: need check for max size? + List<Op> ops = new ArrayList<>(paths.size()); + + for (String path : paths) { + String path0 = parent != null ? parent + "/" + path : path; + + ops.add(Op.delete(path0, ver)); + } + + for (;;) { + long connStartTime = this.connStartTime; + + try { + zk.multi(ops); + + return; + } + catch (KeeperException.NoNodeException e) { + throw e; + } + catch (Exception e) { + onZookeeperError(connStartTime, e); + } + } + } + + /** + * @param path Path. + * @param ver Version. + * @throws KeeperException.NoNodeException If target node does not exist. + * @throws ZookeeperClientFailedException If connection to zk was lost. + * @throws InterruptedException If interrupted. + */ + private void delete(String path, int ver) + throws KeeperException.NoNodeException, ZookeeperClientFailedException, InterruptedException + { + for (;;) { + long connStartTime = this.connStartTime; + + try { + zk.delete(path, ver); + + return; + } + catch (KeeperException.NoNodeException e) { + throw e; + } + catch (Exception e) { + onZookeeperError(connStartTime, e); + } + } + } + + /** + * @param path Path. + * @param data Data. + * @param ver Version. + * @throws ZookeeperClientFailedException If connection to zk was lost. + * @throws InterruptedException If interrupted. + * @throws KeeperException.NoNodeException If node does not exist. + * @throws KeeperException.BadVersionException If version does not match. + */ + void setData(String path, byte[] data, int ver) + throws ZookeeperClientFailedException, InterruptedException, KeeperException.NoNodeException, + KeeperException.BadVersionException + { + if (data == null) + data = EMPTY_BYTES; + + for (;;) { + long connStartTime = this.connStartTime; + + try { + zk.setData(path, data, ver); + + return; + } + catch (KeeperException.BadVersionException | KeeperException.NoNodeException e) { + throw e; + } + catch (Exception e) { + onZookeeperError(connStartTime, e); + } + } + } + + /** + * @param path Path. + * @param stat Optional {@link Stat} instance to return znode state. + * @return Data. + * @throws KeeperException.NoNodeException If target node does not exist. + * @throws ZookeeperClientFailedException If connection to zk was lost. + * @throws InterruptedException If interrupted. + */ + byte[] getData(String path, @Nullable Stat stat) + throws KeeperException.NoNodeException, ZookeeperClientFailedException, InterruptedException { + for (;;) { + long connStartTime = this.connStartTime; + + try { + return zk.getData(path, false, stat); + } + catch (KeeperException.NoNodeException e) { + throw e; + } + catch (Exception e) { + onZookeeperError(connStartTime, e); + } + } + } + + /** + * @param path Path. + * @return Data. + * @throws KeeperException.NoNodeException If target node does not exist. + * @throws ZookeeperClientFailedException If connection to zk was lost. + * @throws InterruptedException If interrupted. + */ + byte[] getData(String path) + throws KeeperException.NoNodeException, ZookeeperClientFailedException, InterruptedException + { + return getData(path, null); + } + + /** + * @param path Path. + */ + void deleteIfExistsAsync(String path) { + new DeleteIfExistsOperation(path).execute(); + } + + /** + * @param path Path. + * @param watcher Watcher. + * @param cb Callback. + */ + void existsAsync(String path, Watcher watcher, AsyncCallback.StatCallback cb) { + ExistsOperation op = new ExistsOperation(path, watcher, cb); + + zk.exists(path, watcher, new StatCallbackWrapper(op), null); + } + + /** + * @param path Path. + * @param watcher Watcher. + * @param cb Callback. + */ + void getChildrenAsync(String path, Watcher watcher, AsyncCallback.Children2Callback cb) { + GetChildrenOperation op = new GetChildrenOperation(path, watcher, cb); + + zk.getChildren(path, watcher, new ChildrenCallbackWrapper(op), null); + } + + /** + * @param path Path. + * @param watcher Watcher. + * @param cb Callback. + */ + void getDataAsync(String path, Watcher watcher, AsyncCallback.DataCallback cb) { + GetDataOperation op = new GetDataOperation(path, watcher, cb); + + zk.getData(path, watcher, new DataCallbackWrapper(op), null); + } + + /** + * @param path Path. + * @param data Data. + * @param createMode Create mode. + * @param cb Callback. + */ + private void createAsync(String path, byte[] data, CreateMode createMode, AsyncCallback.StringCallback cb) { + if (data == null) + data = EMPTY_BYTES; + + CreateOperation op = new CreateOperation(path, data, createMode, cb); + + zk.create(path, data, ZK_ACL, createMode, new CreateCallbackWrapper(op), null); + } + + /** + * + */ + void onCloseStart() { + closing = true; + + synchronized (stateMux) { + stateMux.notifyAll(); + } + } + + /** + * + */ + public void close() { + closeClient(); + } + + /** + * @param prevConnStartTime Time when connection was established. + * @param e Error. + * @throws ZookeeperClientFailedException If connection to zk was lost. + * @throws InterruptedException If interrupted. + */ + private void onZookeeperError(long prevConnStartTime, Exception e) + throws ZookeeperClientFailedException, InterruptedException + { + ZookeeperClientFailedException err = null; + + synchronized (stateMux) { + if (closing) + throw new ZookeeperClientFailedException("ZooKeeper client is closed."); + + U.warn(log, "Failed to execute ZooKeeper operation [err=" + e + ", state=" + state + ']'); + + if (state == ConnectionState.Lost) { + U.error(log, "Operation failed with unexpected error, connection lost: " + e, e); + + throw new ZookeeperClientFailedException(e); + } + + boolean retry = (e instanceof KeeperException) && needRetry(((KeeperException)e).code().intValue()); + + if (retry) { + long remainingTime; + + if (state == ConnectionState.Connected && connStartTime == prevConnStartTime) { + state = ConnectionState.Disconnected; + + connStartTime = System.currentTimeMillis(); + + remainingTime = connLossTimeout; + } + else { + assert connStartTime != 0; + + assert state == ConnectionState.Disconnected : state; + + remainingTime = connLossTimeout - (System.currentTimeMillis() - connStartTime); + + if (remainingTime <= 0) { + state = ConnectionState.Lost; + + U.warn(log, "Failed to establish ZooKeeper connection, close client " + + "[timeout=" + connLossTimeout + ']'); + + err = new ZookeeperClientFailedException(e); + } + } + + if (err == null) { + U.warn(log, "ZooKeeper operation failed, will retry [err=" + e + + ", retryTimeout=" + RETRY_TIMEOUT + + ", connLossTimeout=" + connLossTimeout + + ", path=" + ((KeeperException)e).getPath() + + ", remainingWaitTime=" + remainingTime + ']'); + + stateMux.wait(RETRY_TIMEOUT); + + if (closing) + throw new ZookeeperClientFailedException("ZooKeeper client is closed."); + } + } + else { + U.error(log, "Operation failed with unexpected error, close ZooKeeper client: " + e, e); + + state = ConnectionState.Lost; + + err = new ZookeeperClientFailedException(e); + } + } + + if (err != null) { + closeClient(); + + notifyConnectionLost(); + + throw err; + } + } + + /** + * @param code Zookeeper error code. + * @return {@code True} if can retry operation. + */ + private boolean needRetry(int code) { + return code == KeeperException.Code.CONNECTIONLOSS.intValue() || + code == KeeperException.Code.SESSIONMOVED.intValue() || + code == KeeperException.Code.OPERATIONTIMEOUT.intValue(); + } + + /** + * + */ + private void closeClient() { + try { + zk.close(); + } + catch (Exception closeErr) { + U.warn(log, "Failed to close ZooKeeper client: " + closeErr, closeErr); + } + + connTimer.cancel(); + } + + /** + * + */ + private void scheduleConnectionCheck() { + assert state == ConnectionState.Disconnected : state; + + connTimer.schedule(new ConnectionTimeoutTask(connStartTime), connLossTimeout); + } + + /** + * + */ + interface ZkAsyncOperation { + /** + * + */ + void execute(); + } + + /** + * + */ + class GetChildrenOperation implements ZkAsyncOperation { + /** */ + private final String path; + + /** */ + private final Watcher watcher; + + /** */ + private final AsyncCallback.Children2Callback cb; + + /** + * @param path Path. + * @param watcher Watcher. + * @param cb Callback. + */ + GetChildrenOperation(String path, Watcher watcher, AsyncCallback.Children2Callback cb) { + this.path = path; + this.watcher = watcher; + this.cb = cb; + } + + /** {@inheritDoc} */ + @Override public void execute() { + getChildrenAsync(path, watcher, cb); + } + } + + /** + * + */ + class GetDataOperation implements ZkAsyncOperation { + /** */ + private final String path; + + /** */ + private final Watcher watcher; + + /** */ + private final AsyncCallback.DataCallback cb; + + /** + * @param path Path. + * @param watcher Watcher. + * @param cb Callback. + */ + GetDataOperation(String path, Watcher watcher, AsyncCallback.DataCallback cb) { + this.path = path; + this.watcher = watcher; + this.cb = cb; + } + + /** {@inheritDoc} */ + @Override public void execute() { + getDataAsync(path, watcher, cb); + } + } + + /** + * + */ + class ExistsOperation implements ZkAsyncOperation { + /** */ + private final String path; + + /** */ + private final Watcher watcher; + + /** */ + private final AsyncCallback.StatCallback cb; + + /** + * @param path Path. + * @param watcher Watcher. + * @param cb Callback. + */ + ExistsOperation(String path, Watcher watcher, AsyncCallback.StatCallback cb) { + this.path = path; + this.watcher = watcher; + this.cb = cb; + } + + /** {@inheritDoc} */ + @Override public void execute() { + existsAsync(path, watcher, cb); + } + } + + /** + * + */ + class CreateOperation implements ZkAsyncOperation { + /** */ + private final String path; + + /** */ + private final byte[] data; + + /** */ + private final CreateMode createMode; + + /** */ + private final AsyncCallback.StringCallback cb; + + /** + * @param path path. + * @param data Data. + * @param createMode Create mode. + * @param cb Callback. + */ + CreateOperation(String path, byte[] data, CreateMode createMode, AsyncCallback.StringCallback cb) { + this.path = path; + this.data = data; + this.createMode = createMode; + this.cb = cb; + } + + /** {@inheritDoc} */ + @Override public void execute() { + createAsync(path, data, createMode, cb); + } + } + + /** + * + */ + class DeleteIfExistsOperation implements AsyncCallback.VoidCallback, ZkAsyncOperation { + /** */ + private final String path; + + /** + * @param path Path. + */ + DeleteIfExistsOperation(String path) { + this.path = path; + } + + /** {@inheritDoc} */ + @Override public void execute() { + zk.delete(path, -1, this, null); + } + + /** {@inheritDoc} */ + @Override public void processResult(int rc, String path, Object ctx) { + if (closing) + return; + + if (rc == KeeperException.Code.NONODE.intValue()) + return; + + if (needRetry(rc)) { + U.warn(log, "Failed to execute async operation, connection lost. Will retry after connection restore [" + + "path=" + path + ']'); + + retryQ.add(this); + } + else if (rc == KeeperException.Code.SESSIONEXPIRED.intValue()) + U.warn(log, "Failed to execute async operation, connection lost [path=" + path + ']'); + else + assert rc == 0 : KeeperException.Code.get(rc); + } + } + + /** + * + */ + class CreateCallbackWrapper implements AsyncCallback.StringCallback { + /** */ + final CreateOperation op; + + /** + * @param op Operation. + */ + CreateCallbackWrapper(CreateOperation op) { + this.op = op; + } + + @Override public void processResult(int rc, String path, Object ctx, String name) { + if (closing) + return; + + if (rc == KeeperException.Code.NODEEXISTS.intValue()) + return; + + if (needRetry(rc)) { + U.warn(log, "Failed to execute async operation, connection lost. Will retry after connection restore [path=" + path + ']'); + + retryQ.add(op); + } + else if (rc == KeeperException.Code.SESSIONEXPIRED.intValue()) + U.warn(log, "Failed to execute async operation, connection lost [path=" + path + ']'); + else { + if (op.cb != null) + op.cb.processResult(rc, path, ctx, name); + } + } + } + + /** + * + */ + class ChildrenCallbackWrapper implements AsyncCallback.Children2Callback { + /** */ + private final GetChildrenOperation op; + + /** + * @param op Operation. + */ + private ChildrenCallbackWrapper(GetChildrenOperation op) { + this.op = op; + } + + /** {@inheritDoc} */ + @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { + if (closing) + return; + + if (needRetry(rc)) { + U.warn(log, "Failed to execute async operation, connection lost. Will retry after connection restore [path=" + path + ']'); + + retryQ.add(op); + } + else if (rc == KeeperException.Code.SESSIONEXPIRED.intValue()) + U.warn(log, "Failed to execute async operation, connection lost [path=" + path + ']'); + else + op.cb.processResult(rc, path, ctx, children, stat); + } + } + + /** + * + */ + class DataCallbackWrapper implements AsyncCallback.DataCallback { + /** */ + private final GetDataOperation op; + + /** + * @param op Operation. + */ + private DataCallbackWrapper(GetDataOperation op) { + this.op = op; + } + + /** {@inheritDoc} */ + @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { + if (closing) + return; + + if (needRetry(rc)) { + U.warn(log, "Failed to execute async operation, connection lost. Will retry after connection restore [path=" + path + ']'); + + retryQ.add(op); + } + else if (rc == KeeperException.Code.SESSIONEXPIRED.intValue()) + U.warn(log, "Failed to execute async operation, connection lost [path=" + path + ']'); + else + op.cb.processResult(rc, path, ctx, data, stat); + } + } + + /** + * + */ + class StatCallbackWrapper implements AsyncCallback.StatCallback { + /** */ + private final ExistsOperation op; + + /** + * @param op Operation. + */ + private StatCallbackWrapper(ExistsOperation op) { + this.op = op; + } + + /** {@inheritDoc} */ + @Override public void processResult(int rc, String path, Object ctx, Stat stat) { + if (closing) + return; + + if (needRetry(rc)) { + U.warn(log, "Failed to execute async operation, connection lost. Will retry after connection restore [path=" + path + ']'); + + retryQ.add(op); + } + else if (rc == KeeperException.Code.SESSIONEXPIRED.intValue()) + U.warn(log, "Failed to execute async operation, connection lost [path=" + path + ']'); + else + op.cb.processResult(rc, path, ctx, stat); + } + } + + /** + * + */ + private class ConnectionTimeoutTask extends TimerTask { + /** */ + private final long connectStartTime; + + /** + * @param connectStartTime Time was connection started. + */ + ConnectionTimeoutTask(long connectStartTime) { + this.connectStartTime = connectStartTime; + } + + /** {@inheritDoc} */ + @Override public void run() { + boolean connLoss = false; + + synchronized (stateMux) { + if (closing) + return; + + if (state == ConnectionState.Disconnected && + ZookeeperClient.this.connStartTime == connectStartTime) { + + state = ConnectionState.Lost; + + U.warn(log, "Failed to establish ZooKeeper connection, close client " + + "[timeout=" + connLossTimeout + ']'); + + connLoss = true; + } + } + + if (connLoss) { + closeClient(); + + notifyConnectionLost(); + } + } + } + + /** + * + */ + private enum ConnectionState { + /** */ + Connected, + /** */ + Disconnected, + /** */ + Lost + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientFailedException.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientFailedException.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientFailedException.java new file mode 100644 index 0000000..01d011b --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientFailedException.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +/** + * + */ +class ZookeeperClientFailedException extends Exception { + /** */ + private static final long serialVersionUID = 0L; + + /** + * @param msg Message. + */ + ZookeeperClientFailedException(String msg) { + super(msg); + } + + /** + * @param cause Cause. + */ + ZookeeperClientFailedException(Throwable cause) { + super(cause); + } +}