http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java new file mode 100644 index 0000000..20dba12 --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -0,0 +1,4483 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.io.ByteArrayInputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.BitSet; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.zip.DataFormatException; +import java.util.zip.Deflater; +import java.util.zip.Inflater; +import java.util.zip.InflaterInputStream; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteClientDisconnectedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteInterruptedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CommunicationFailureResolver; +import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.ClusterMetricsSnapshot; +import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; +import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.IgnitionEx; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.events.DiscoveryCustomEvent; +import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.security.SecurityContext; +import org.apache.ignite.internal.util.GridLongList; +import org.apache.ignite.internal.util.GridSpinBusyLock; +import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.io.GridByteArrayOutputStream; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.LT; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgniteRunnable; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.marshaller.MarshallerUtils; +import org.apache.ignite.marshaller.jdk.JdkMarshaller; +import org.apache.ignite.plugin.security.SecurityCredentials; +import org.apache.ignite.spi.IgniteNodeValidationResult; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.IgniteSpiTimeoutObject; +import org.apache.ignite.spi.discovery.DiscoveryDataBag; +import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; +import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange; +import org.apache.ignite.spi.discovery.DiscoverySpiListener; +import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator; +import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi; +import org.apache.ignite.thread.IgniteThreadPoolExecutor; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.data.Stat; +import org.jboss.netty.util.internal.ConcurrentHashMap; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED; +import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED; +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; +import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED; +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME; +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS; +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SECURITY_SUBJECT_V2; +import static org.apache.zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL; +import static org.apache.zookeeper.CreateMode.PERSISTENT; + +/** + * + */ +public class ZookeeperDiscoveryImpl { + /** */ + static final String IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD = "IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD"; + + /** */ + static final String IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_TIMEOUT = "IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_TIMEOUT"; + + /** */ + static final String IGNITE_ZOOKEEPER_DISCOVERY_SPI_MAX_EVTS = "IGNITE_ZOOKEEPER_DISCOVERY_SPI_MAX_EVTS"; + + /** */ + private static final String IGNITE_ZOOKEEPER_DISCOVERY_SPI_EVTS_THROTTLE = "IGNITE_ZOOKEEPER_DISCOVERY_SPI_EVTS_THROTTLE"; + + /** */ + final ZookeeperDiscoverySpi spi; + + /** */ + private final String igniteInstanceName; + + /** */ + private final String connectString; + + /** */ + private final int sesTimeout; + + /** */ + private final JdkMarshaller marsh = new JdkMarshaller(); + + /** */ + private final ZkIgnitePaths zkPaths; + + /** */ + private final IgniteLogger log; + + /** */ + final GridSpinBusyLock busyLock = new GridSpinBusyLock(); + + /** */ + private final ZookeeperClusterNode locNode; + + /** */ + private final DiscoverySpiListener lsnr; + + /** */ + private final DiscoverySpiDataExchange exchange; + + /** */ + private final boolean clientReconnectEnabled; + + /** */ + private final GridFutureAdapter<Void> joinFut = new GridFutureAdapter<>(); + + /** */ + private final int evtsAckThreshold; + + /** */ + private IgniteThreadPoolExecutor utilityPool; + + /** */ + private ZkRuntimeState rtState; + + /** */ + private volatile ConnectionState connState = ConnectionState.STARTED; + + /** */ + private final AtomicBoolean stop = new AtomicBoolean(); + + /** */ + private final Object stateMux = new Object(); + + /** */ + public volatile IgniteDiscoverySpiInternalListener internalLsnr; + + /** */ + private final ConcurrentHashMap<Long, PingFuture> pingFuts = new ConcurrentHashMap<>(); + + /** */ + private final AtomicReference<ZkCommunicationErrorProcessFuture> commErrProcFut = new AtomicReference<>(); + + /** + * @param spi Discovery SPI. + * @param igniteInstanceName Instance name. + * @param log Logger. + * @param zkRootPath Zookeeper base path node all nodes. + * @param locNode Local node instance. + * @param lsnr Discovery events listener. + * @param exchange Discovery data exchange. + * @param internalLsnr Internal listener (used for testing only). + */ + public ZookeeperDiscoveryImpl( + ZookeeperDiscoverySpi spi, + String igniteInstanceName, + IgniteLogger log, + String zkRootPath, + ZookeeperClusterNode locNode, + DiscoverySpiListener lsnr, + DiscoverySpiDataExchange exchange, + IgniteDiscoverySpiInternalListener internalLsnr) { + assert locNode.id() != null && locNode.isLocal() : locNode; + + zkRootPath = zkRootPath.trim(); + + if (zkRootPath.endsWith(ZkIgnitePaths.PATH_SEPARATOR)) + zkRootPath = zkRootPath.substring(0, zkRootPath.length() - 1); + + ZkIgnitePaths.validatePath(zkRootPath); + + MarshallerUtils.setNodeName(marsh, igniteInstanceName); + + zkPaths = new ZkIgnitePaths(zkRootPath); + + this.spi = spi; + this.igniteInstanceName = igniteInstanceName; + this.connectString = spi.getZkConnectionString(); + this.sesTimeout = (int)spi.getSessionTimeout(); + this.log = log.getLogger(getClass()); + this.locNode = locNode; + this.lsnr = lsnr; + this.exchange = exchange; + this.clientReconnectEnabled = locNode.isClient() && !spi.isClientReconnectDisabled(); + + int evtsAckThreshold = IgniteSystemProperties.getInteger(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD, 5); + + if (evtsAckThreshold <= 0) + evtsAckThreshold = 1; + + this.evtsAckThreshold = evtsAckThreshold; + + if (internalLsnr != null) + this.internalLsnr = internalLsnr; + } + + /** + * @return Exception. + */ + private static IgniteClientDisconnectedCheckedException disconnectError() { + return new IgniteClientDisconnectedCheckedException(null, "Client node disconnected."); + } + + /** + * @return Logger. + */ + IgniteLogger log() { + return log; + } + + /** + * @return Local node instance. + */ + public ClusterNode localNode() { + return locNode; + } + + /** + * @param nodeId Node ID. + * @return Node instance. + */ + @Nullable public ZookeeperClusterNode node(UUID nodeId) { + assert nodeId != null; + + return rtState.top.nodesById.get(nodeId); + } + + /** + * @param nodeOrder Node order. + * @return Node instance. + */ + @Nullable public ZookeeperClusterNode node(long nodeOrder) { + assert nodeOrder > 0 : nodeOrder; + + return rtState.top.nodesByOrder.get(nodeOrder); + } + + /** + * @param fut Future to remove. + */ + void clearCommunicationErrorProcessFuture(ZkCommunicationErrorProcessFuture fut) { + assert fut.isDone() : fut; + + commErrProcFut.compareAndSet(fut, null); + } + + /** + * @param node0 Problem node ID + * @param err Connect error. + */ + public void resolveCommunicationError(ClusterNode node0, Exception err) { + ZookeeperClusterNode node = node(node0.id()); + + if (node == null) + throw new IgniteSpiException(new ClusterTopologyCheckedException("Node failed: " + node0.id())); + + IgniteInternalFuture<Boolean> nodeStatusFut; + + for (;;) { + checkState(); + + ZkCommunicationErrorProcessFuture fut = commErrProcFut.get(); + + if (fut == null || fut.isDone()) { + ZkCommunicationErrorProcessFuture newFut = ZkCommunicationErrorProcessFuture.createOnCommunicationError( + this, + node.sessionTimeout() + 1000); + + if (commErrProcFut.compareAndSet(fut, newFut)) { + fut = newFut; + + if (log.isInfoEnabled()) { + log.info("Created new communication error process future [errNode=" + node0.id() + + ", err=" + err + ']'); + } + + try { + checkState(); + } + catch (Exception e) { + fut.onError(e); + + throw e; + } + + fut.scheduleCheckOnTimeout(); + } + else { + fut = commErrProcFut.get(); + + if (fut == null) + continue; + } + } + + nodeStatusFut = fut.nodeStatusFuture(node); + + if (nodeStatusFut != null) + break; + else { + try { + fut.get(); + } + catch (IgniteCheckedException e) { + U.warn(log, "Previous communication error process future failed: " + e); + } + } + } + + try { + if (!nodeStatusFut.get()) + throw new IgniteSpiException(new ClusterTopologyCheckedException("Node failed: " + node0.id())); + } + catch (IgniteCheckedException e) { + throw new IgniteSpiException(e); + } + } + + /** + * @param nodeId Node ID. + * @return Ping result. + */ + public boolean pingNode(UUID nodeId) { + checkState(); + + ZkRuntimeState rtState = this.rtState; + + ZookeeperClusterNode node = rtState.top.nodesById.get(nodeId); + + if (node == null) + return false; + + if (node.isLocal()) + return true; + + PingFuture fut = pingFuts.get(node.order()); + + if (fut == null) { + fut = new PingFuture(rtState, node); + + PingFuture old = pingFuts.putIfAbsent(node.order(), fut); + + if (old == null) { + if (fut.checkNodeAndState()) + spi.getSpiContext().addTimeoutObject(fut); + else + assert fut.isDone(); + } + else + fut = old; + } + + try { + return fut.get(); + } + catch (IgniteCheckedException e) { + throw new IgniteSpiException(e); + } + } + + /** + * @param nodeId Node ID. + * @param warning Warning. + */ + public void failNode(UUID nodeId, @Nullable String warning) { + ZookeeperClusterNode node = rtState.top.nodesById.get(nodeId); + + if (node == null) { + if (log.isDebugEnabled()) + log.debug("Ignore forcible node fail request, node does not exist: " + nodeId); + + return; + } + + if (!node.isClient()) { + U.warn(log, "Ignore forcible node fail request for non-client node: " + node); + + return; + } + + sendCustomMessage(new ZkForceNodeFailMessage(node.internalId(), warning)); + } + + /** + * + */ + public void reconnect() { + assert clientReconnectEnabled; + + synchronized (stateMux) { + if (connState == ConnectionState.STARTED) { + connState = ConnectionState.DISCONNECTED; + + rtState.onCloseStart(disconnectError()); + } + else + return; + } + + busyLock.block(); + + busyLock.unblock(); + + rtState.zkClient.close(); + + UUID newId = UUID.randomUUID(); + + U.quietAndWarn(log, "Local node will try to reconnect to cluster with new id due to network problems [" + + "newId=" + newId + + ", prevId=" + locNode.id() + + ", locNode=" + locNode + ']'); + + runInWorkerThread(new ReconnectClosure(newId)); + } + + /** + * @param newId New ID. + */ + private void doReconnect(UUID newId) { + if (rtState.joined) { + assert rtState.evtsData != null; + + lsnr.onDiscovery(EVT_CLIENT_NODE_DISCONNECTED, + rtState.evtsData.topVer, + locNode, + rtState.top.topologySnapshot(), + Collections.<Long, Collection<ClusterNode>>emptyMap(), + null); + } + + try { + locNode.onClientDisconnected(newId); + + joinTopology(rtState); + } + catch (Exception e) { + if (stopping()) { + if (log.isDebugEnabled()) + log.debug("Reconnect failed, node is stopping [err=" + e + ']'); + + return; + } + + U.error(log, "Failed to reconnect: " + e, e); + + onSegmented(e); + } + } + + /** + * @return {@code True} if started to stop. + */ + private boolean stopping() { + if (stop.get()) + return true; + + synchronized (stateMux) { + if (connState == ConnectionState.STOPPED) + return true; + } + + return false; + } + + /** + * @param e Error. + */ + private void onSegmented(Exception e) { + rtState.errForClose = e; + + if (rtState.joined || joinFut.isDone()) { + synchronized (stateMux) { + connState = ConnectionState.STOPPED; + } + + notifySegmented(); + } + else + joinFut.onDone(e); + } + + /** + * + */ + private void notifySegmented() { + List<ClusterNode> nodes = rtState.top.topologySnapshot(); + + if (nodes.isEmpty()) + nodes = Collections.singletonList((ClusterNode)locNode); + + lsnr.onDiscovery(EVT_NODE_SEGMENTED, + rtState.evtsData != null ? rtState.evtsData.topVer : 1L, + locNode, + nodes, + Collections.<Long, Collection<ClusterNode>>emptyMap(), + null); + } + + /** + * @return Remote nodes. + */ + public Collection<ClusterNode> remoteNodes() { + checkState(); + + return rtState.top.remoteNodes(); + } + + /** + * + */ + private void checkState() { + switch (connState) { + case STARTED: + break; + + case STOPPED: + throw new IgniteSpiException("Node stopped."); + + case DISCONNECTED: + throw new IgniteClientDisconnectedException(null, "Client is disconnected."); + } + } + + /** + * @param nodeId Node ID. + * @return {@code True} if node joined or joining topology. + */ + public boolean knownNode(UUID nodeId) { + while (!busyLock.enterBusy()) + checkState(); + + try { + List<String> children = rtState.zkClient.getChildren(zkPaths.aliveNodesDir); + + for (int i = 0; i < children.size(); i++) { + UUID id = ZkIgnitePaths.aliveNodeId(children.get(i)); + + if (nodeId.equals(id)) + return true; + } + + return false; + } + catch (ZookeeperClientFailedException e) { + if (clientReconnectEnabled) + throw new IgniteClientDisconnectedException(null, "Client is disconnected."); + + throw new IgniteException(e); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IgniteInterruptedException(e); + } + finally { + busyLock.leaveBusy(); + } + } + + /** + * @param msg Message. + */ + public void sendCustomMessage(DiscoverySpiCustomMessage msg) { + assert msg != null; + + byte[] msgBytes; + + try { + msgBytes = marshalZip(msg); + } + catch (IgniteCheckedException e) { + throw new IgniteSpiException("Failed to marshal custom message: " + msg, e); + } + + while (!busyLock.enterBusy()) + checkState(); + + try { + ZookeeperClient zkClient = rtState.zkClient; + + saveCustomMessage(zkClient, msgBytes); + } + catch (ZookeeperClientFailedException e) { + if (clientReconnectEnabled) + throw new IgniteClientDisconnectedException(null, "Client is disconnected."); + + throw new IgniteException(e); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IgniteInterruptedException(e); + } + finally { + busyLock.leaveBusy(); + } + } + + /** + * @param zkClient Client. + * @param msgBytes Marshalled message. + * @throws ZookeeperClientFailedException If connection to zk was lost. + * @throws InterruptedException If interrupted. + */ + private void saveCustomMessage(ZookeeperClient zkClient, byte[] msgBytes) + throws ZookeeperClientFailedException, InterruptedException + { + String prefix = UUID.randomUUID().toString(); + + int partCnt = 1; + + int overhead = 10; + + UUID locId = locNode.id(); + + String path = zkPaths.createCustomEventPath(prefix, locId, partCnt); + + if (zkClient.needSplitNodeData(path, msgBytes, overhead)) { + List<byte[]> parts = zkClient.splitNodeData(path, msgBytes, overhead); + + String partsBasePath = zkPaths.customEventPartsBasePath(prefix, locId); + + saveMultipleParts(zkClient, partsBasePath, parts); + + msgBytes = null; + + partCnt = parts.size(); + } + + zkClient.createSequential(prefix, + zkPaths.customEvtsDir, + zkPaths.createCustomEventPath(prefix, locId, partCnt), + msgBytes, + CreateMode.PERSISTENT_SEQUENTIAL); + } + + /** + * @return Cluster start time. + */ + public long gridStartTime() { + return rtState.gridStartTime; + } + + /** + * Starts join procedure and waits for {@link EventType#EVT_NODE_JOINED} event for local node. + * + * @throws InterruptedException If interrupted. + */ + public void startJoinAndWait() throws InterruptedException { + joinTopology(null); + + for (;;) { + try { + joinFut.get(10_000); + + break; + } + catch (IgniteFutureTimeoutCheckedException e) { + U.warn(log, "Waiting for local join event [nodeId=" + locNode.id() + ", name=" + igniteInstanceName + ']'); + } + catch (Exception e) { + IgniteSpiException spiErr = X.cause(e, IgniteSpiException.class); + + if (spiErr != null) + throw spiErr; + + throw new IgniteSpiException("Failed to join cluster", e); + } + } + } + + /** + * @param prevState Previous state in case of connect retry. + * @throws InterruptedException If interrupted. + */ + private void joinTopology(@Nullable ZkRuntimeState prevState) throws InterruptedException { + if (!busyLock.enterBusy()) + return; + + try { + boolean reconnect = prevState != null; + + // Need fire EVT_CLIENT_NODE_RECONNECTED event if reconnect after already joined. + boolean prevJoined = prevState != null && prevState.joined; + + IgniteDiscoverySpiInternalListener internalLsnr = this.internalLsnr; + + if (internalLsnr != null) + internalLsnr.beforeJoin(locNode, log); + + if (locNode.isClient() && reconnect) + locNode.setAttributes(spi.getSpiContext().nodeAttributes()); + + marshalCredentialsOnJoin(locNode); + + synchronized (stateMux) { + if (connState == ConnectionState.STOPPED) + return; + + connState = ConnectionState.STARTED; + } + + ZkRuntimeState rtState = this.rtState = new ZkRuntimeState(prevJoined); + + DiscoveryDataBag discoDataBag = new DiscoveryDataBag(locNode.id()); + + exchange.collect(discoDataBag); + + ZkJoiningNodeData joinData = new ZkJoiningNodeData(locNode, discoDataBag.joiningNodeData()); + + byte[] joinDataBytes; + + try { + joinDataBytes = marshalZip(joinData); + } + catch (Exception e) { + throw new IgniteSpiException("Failed to marshal joining node data", e); + } + + try { + rtState.zkClient = new ZookeeperClient( + igniteInstanceName, + log, + connectString, + sesTimeout, + new ConnectionLossListener()); + } + catch (Exception e) { + throw new IgniteSpiException("Failed to create Zookeeper client", e); + } + + startJoin(rtState, prevState, joinDataBytes); + } + finally { + busyLock.leaveBusy(); + } + } + + /** + * @throws InterruptedException If interrupted. + */ + private void initZkNodes() throws InterruptedException { + try { + ZookeeperClient client = rtState.zkClient; + + if (client.exists(zkPaths.aliveNodesDir)) + return; // This path is created last, assume all others dirs are created. + + if (!client.exists(zkPaths.aliveNodesDir)) + createRootPathParents(zkPaths.clusterDir, client); + + List<String> dirs = new ArrayList<>(); + + dirs.add(zkPaths.clusterDir); + dirs.add(zkPaths.evtsPath); + dirs.add(zkPaths.joinDataDir); + dirs.add(zkPaths.customEvtsDir); + dirs.add(zkPaths.customEvtsPartsDir); + dirs.add(zkPaths.customEvtsAcksDir); + dirs.add(zkPaths.aliveNodesDir); + + try { + client.createAll(dirs, PERSISTENT); + } + catch (KeeperException.NodeExistsException e) { + if (log.isDebugEnabled()) + log.debug("Failed to create nodes using bulk operation: " + e); + + for (String dir : dirs) + client.createIfNeeded(dir, null, PERSISTENT); + } + } + catch (ZookeeperClientFailedException e) { + throw new IgniteSpiException("Failed to initialize Zookeeper nodes", e); + } + } + + /** + * @param rootDir Root directory. + * @param client Client. + * @throws ZookeeperClientFailedException If connection to zk was lost. + * @throws InterruptedException If interrupted. + */ + private void createRootPathParents(String rootDir, ZookeeperClient client) + throws ZookeeperClientFailedException, InterruptedException { + int startIdx = 0; + + for (;;) { + int separatorIdx = rootDir.indexOf(ZkIgnitePaths.PATH_SEPARATOR, startIdx); + + if (separatorIdx == -1) + break; + + if (separatorIdx > 0) { + String path = rootDir.substring(0, separatorIdx); + + client.createIfNeeded(path, null, CreateMode.PERSISTENT); + } + + startIdx = separatorIdx + 1; + } + } + + /** + * @param zkClient Client. + * @param basePath Base path. + * @param partCnt Parts count. + */ + private void deleteMultiplePartsAsync(ZookeeperClient zkClient, String basePath, int partCnt) { + for (int i = 0; i < partCnt; i++) { + String path = multipartPathName(basePath, i); + + zkClient.deleteIfExistsAsync(path); + } + } + + /** + * @param zkClient Client. + * @param basePath Base path. + * @param partCnt Parts count. + * @return Read parts. + * @throws Exception If failed. + */ + private byte[] readMultipleParts(ZookeeperClient zkClient, String basePath, int partCnt) + throws Exception { + assert partCnt >= 1; + + if (partCnt > 1) { + List<byte[]> parts = new ArrayList<>(partCnt); + + int totSize = 0; + + for (int i = 0; i < partCnt; i++) { + byte[] part = zkClient.getData(multipartPathName(basePath, i)); + + parts.add(part); + + totSize += part.length; + } + + byte[] res = new byte[totSize]; + + int pos = 0; + + for (int i = 0; i < partCnt; i++) { + byte[] part = parts.get(i); + + System.arraycopy(part, 0, res, pos, part.length); + + pos += part.length; + } + + return res; + } + else + return zkClient.getData(multipartPathName(basePath, 0)); + } + + /** + * @param zkClient Client. + * @param basePath Base path. + * @param parts Data parts. + * @return Number of parts. + * @throws ZookeeperClientFailedException If client failed. + * @throws InterruptedException If interrupted. + */ + private int saveMultipleParts(ZookeeperClient zkClient, String basePath, List<byte[]> parts) + throws ZookeeperClientFailedException, InterruptedException + { + assert parts.size() > 1; + + for (int i = 0; i < parts.size(); i++) { + byte[] part = parts.get(i); + + String path = multipartPathName(basePath, i); + + zkClient.createIfNeeded(path, part, PERSISTENT); + } + + return parts.size(); + } + + /** + * @param basePath Base path. + * @param part Part number. + * @return Path. + */ + private static String multipartPathName(String basePath, int part) { + return basePath + String.format("%04d", part); + } + + /** + * @param rtState Runtime state. + * @param joinDataBytes Joining node data. + * @param prevState Previous state in case of connect retry. + * @throws InterruptedException If interrupted. + */ + private void startJoin(ZkRuntimeState rtState, @Nullable ZkRuntimeState prevState, final byte[] joinDataBytes) + throws InterruptedException + { + try { + long startTime = System.currentTimeMillis(); + + initZkNodes(); + + String prefix = UUID.randomUUID().toString(); + + rtState.init(new ZkWatcher(rtState), new AliveNodeDataWatcher(rtState)); + + ZookeeperClient zkClient = rtState.zkClient; + + final int OVERHEAD = 5; + + // TODO ZK: need clean up join data if failed before was able to create alive node. + String joinDataPath = zkPaths.joinDataDir + "/" + prefix + ":" + locNode.id(); + + if (zkClient.needSplitNodeData(joinDataPath, joinDataBytes, OVERHEAD)) { + List<byte[]> parts = zkClient.splitNodeData(joinDataPath, joinDataBytes, OVERHEAD); + + rtState.joinDataPartCnt = parts.size(); + + saveMultipleParts(zkClient, joinDataPath + ":", parts); + + joinDataPath = zkClient.createIfNeeded( + joinDataPath, + marshalZip(new ZkJoiningNodeData(parts.size())), + PERSISTENT); + } + else { + joinDataPath = zkClient.createIfNeeded( + joinDataPath, + joinDataBytes, + PERSISTENT); + } + + rtState.locNodeZkPath = zkClient.createSequential( + prefix, + zkPaths.aliveNodesDir, + zkPaths.aliveNodePathForCreate(prefix, locNode), + null, + EPHEMERAL_SEQUENTIAL); + + rtState.internalOrder = ZkIgnitePaths.aliveInternalId(rtState.locNodeZkPath); + + if (log.isInfoEnabled()) { + log.info("Node started join [nodeId=" + locNode.id() + + ", instanceName=" + locNode.attribute(ATTR_IGNITE_INSTANCE_NAME) + + ", joinDataSize=" + joinDataBytes.length + + (rtState.joinDataPartCnt > 1 ? (", joinDataPartCnt=" + rtState.joinDataPartCnt) : "") + + ", consistentId=" + locNode.consistentId() + + ", initTime=" + (System.currentTimeMillis() - startTime) + + ", nodePath=" + rtState.locNodeZkPath + ']'); + } + + /* + If node can not join due to validation error this error is reported in join data, + As a minor optimization do not start watch join data immediately, but only if do not receive + join event after some timeout. + */ + CheckJoinErrorWatcher joinErrorWatcher = new CheckJoinErrorWatcher(5000, joinDataPath, rtState); + + rtState.joinErrTo = joinErrorWatcher.timeoutObj; + + if (locNode.isClient() && spi.getJoinTimeout() > 0) { + ZkTimeoutObject joinTimeoutObj = prevState != null ? prevState.joinTo : null; + + if (joinTimeoutObj == null) { + joinTimeoutObj = new JoinTimeoutObject(spi.getJoinTimeout()); + + spi.getSpiContext().addTimeoutObject(joinTimeoutObj); + } + + rtState.joinTo = joinTimeoutObj; + } + + if (!locNode.isClient()) + zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new CheckCoordinatorCallback(rtState)); + + zkClient.getDataAsync(zkPaths.evtsPath, rtState.watcher, rtState.watcher); + + spi.getSpiContext().addTimeoutObject(rtState.joinErrTo); + } + catch (IgniteCheckedException | ZookeeperClientFailedException e) { + throw new IgniteSpiException("Failed to initialize Zookeeper nodes", e); + } + } + + /** + * Authenticate local node. + * + * @param nodeAuth Authenticator. + * @param locCred Local security credentials for authentication. + * @throws IgniteSpiException If any error occurs. + */ + private void localAuthentication(DiscoverySpiNodeAuthenticator nodeAuth, SecurityCredentials locCred){ + assert nodeAuth != null; + assert locCred != null; + + try { + SecurityContext subj = nodeAuth.authenticateNode(locNode, locCred); + + // Note: exception message is checked in tests. + if (subj == null) + throw new IgniteSpiException("Authentication failed for local node."); + + if (!(subj instanceof Serializable)) + throw new IgniteSpiException("Authentication subject is not Serializable."); + + Map<String, Object> attrs = new HashMap<>(locNode.attributes()); + + attrs.put(ATTR_SECURITY_SUBJECT_V2, U.marshal(marsh, subj)); + + locNode.setAttributes(attrs); + } + catch (Exception e) { + throw new IgniteSpiException("Failed to authenticate local node (will shutdown local node).", e); + } + } + + /** + * @param node Node. + * @param zipBytes Zip-compressed marshalled security subject. + * @throws Exception If failed. + */ + private void setNodeSecuritySubject(ZookeeperClusterNode node, byte[] zipBytes) throws Exception { + assert zipBytes != null; + + Map<String, Object> attrs = new HashMap<>(node.getAttributes()); + + attrs.put(ATTR_SECURITY_SUBJECT_V2, unzip(zipBytes)); + + node.setAttributes(attrs); + } + + /** + * @param node Node. + * @return Credentials. + * @throws IgniteCheckedException If failed to unmarshal. + */ + private SecurityCredentials unmarshalCredentials(ZookeeperClusterNode node) throws Exception { + byte[] credBytes = (byte[])node.getAttributes().get(ATTR_SECURITY_CREDENTIALS); + + if (credBytes == null) + return null; + + return unmarshalZip(credBytes); + } + + /** + * Marshalls credentials with discovery SPI marshaller (will replace attribute value). + * + * @param node Node to marshall credentials for. + * @throws IgniteSpiException If marshalling failed. + */ + private void marshalCredentialsOnJoin(ZookeeperClusterNode node) throws IgniteSpiException { + try { + // Use security-unsafe getter. + Map<String, Object> attrs0 = node.getAttributes(); + + Object creds = attrs0.get(ATTR_SECURITY_CREDENTIALS); + + if (creds != null) { + Map<String, Object> attrs = new HashMap<>(attrs0); + + assert !(creds instanceof byte[]); + + attrs.put(ATTR_SECURITY_CREDENTIALS, marshalZip(creds)); + + node.setAttributes(attrs); + } + } + catch (IgniteCheckedException e) { + throw new IgniteSpiException("Failed to marshal node security credentials: " + node.id(), e); + } + } + + /** + * + */ + private class UpdateProcessedEventsTimeoutObject extends ZkTimeoutObject { + /** */ + private final ZkRuntimeState rtState; + + /** + * @param rtState Runtime state. + * @param timeout Timeout. + */ + UpdateProcessedEventsTimeoutObject(ZkRuntimeState rtState, long timeout) { + super(timeout); + + this.rtState = rtState; + } + + /** {@inheritDoc} */ + @Override public void onTimeout() { + runInWorkerThread(new ZkRunnable(rtState, ZookeeperDiscoveryImpl.this) { + @Override protected void run0() throws Exception { + updateProcessedEventsOnTimeout(rtState, UpdateProcessedEventsTimeoutObject.this); + } + }); + } + } + + /** + * + */ + private class JoinTimeoutObject extends ZkTimeoutObject { + /** + * @param timeout Timeout. + */ + JoinTimeoutObject(long timeout) { + super(timeout); + } + + /** {@inheritDoc} */ + @Override public void onTimeout() { + if (cancelled || rtState.joined) + return; + + runInWorkerThread(new Runnable() { + @Override public void run() { + synchronized (stateMux) { + if (cancelled || rtState.joined) + return; + + if (connState == ConnectionState.STOPPED) + return; + + connState = ConnectionState.STOPPED; + } + + U.warn(log, "Failed to connect to cluster, either connection to ZooKeeper can not be established or there " + + "are no alive server nodes (consider increasing 'joinTimeout' configuration property) [" + + "joinTimeout=" + spi.getJoinTimeout() + ']'); + + // Note: exception message is checked in tests. + onSegmented(new IgniteSpiException("Failed to connect to cluster within configured timeout")); + } + }); + } + } + + /** + * + */ + private class CheckJoinErrorWatcher extends ZkAbstractWatcher implements AsyncCallback.DataCallback { + /** */ + private final String joinDataPath; + + /** */ + private ZkTimeoutObject timeoutObj; + + /** + * @param timeout Timeout. + * @param joinDataPath0 Node joined data path. + * @param rtState0 State. + */ + CheckJoinErrorWatcher(long timeout, String joinDataPath0, ZkRuntimeState rtState0) { + super(rtState0, ZookeeperDiscoveryImpl.this); + + this.joinDataPath = joinDataPath0; + + timeoutObj = new ZkTimeoutObject(timeout) { + @Override public void onTimeout() { + if (rtState.errForClose != null || rtState.joined) + return; + + synchronized (stateMux) { + if (connState != ConnectionState.STARTED) + return; + } + + rtState.zkClient.getDataAsync(joinDataPath, + CheckJoinErrorWatcher.this, + CheckJoinErrorWatcher.this); + } + }; + } + + /** {@inheritDoc} */ + @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { + if (rc != 0) + return; + + if (!onProcessStart()) + return; + + try { + Object obj = unmarshalZip(data); + + if (obj instanceof ZkInternalJoinErrorMessage) { + ZkInternalJoinErrorMessage joinErr = (ZkInternalJoinErrorMessage)obj; + + onSegmented(new IgniteSpiException(joinErr.err)); + } + + onProcessEnd(); + } + catch (Throwable e) { + onProcessError(e); + } + } + + /** {@inheritDoc} */ + @Override public void process0(WatchedEvent evt) { + if (rtState.errForClose != null || rtState.joined) + return; + + if (evt.getType() == Event.EventType.NodeDataChanged) + rtState.zkClient.getDataAsync(evt.getPath(), this, this); + } + } + + /** + * @param aliveNodes Alive nodes. + * @throws Exception If failed. + */ + private void checkIsCoordinator(final List<String> aliveNodes) throws Exception { + assert !locNode.isClient(); + + TreeMap<Long, String> aliveSrvs = new TreeMap<>(); + + long locInternalOrder = rtState.internalOrder; + + for (String aliveNodePath : aliveNodes) { + if (ZkIgnitePaths.aliveNodeClientFlag(aliveNodePath)) + continue; + + Long internalId = ZkIgnitePaths.aliveInternalId(aliveNodePath); + + aliveSrvs.put(internalId, aliveNodePath); + } + + assert !aliveSrvs.isEmpty(); + + Map.Entry<Long, String> crdE = aliveSrvs.firstEntry(); + + if (locInternalOrder == crdE.getKey()) + onBecomeCoordinator(aliveNodes); + else { + assert aliveSrvs.size() > 1 : aliveSrvs; + + Map.Entry<Long, String> prevE = aliveSrvs.floorEntry(locInternalOrder - 1); + + assert prevE != null; + + if (log.isInfoEnabled()) { + log.info("Discovery coordinator already exists, watch for previous server node [" + + "locId=" + locNode.id() + + ", watchPath=" + prevE.getValue() + ']'); + } + + PreviousNodeWatcher watcher = new ServerPreviousNodeWatcher(rtState); + + rtState.zkClient.existsAsync(zkPaths.aliveNodesDir + "/" + prevE.getValue(), watcher, watcher); + } + } + + /** + * @param aliveNodes Alive nodes. + * @throws Exception If failed. + */ + private void checkClientsStatus(final List<String> aliveNodes) throws Exception { + assert locNode.isClient() : locNode; + assert rtState.joined; + assert rtState.evtsData != null; + + TreeMap<Long, String> aliveClients = new TreeMap<>(); + + String srvPath = null; + Long srvInternalOrder = null; + + long locInternalOrder = rtState.internalOrder; + + for (String aliveNodePath : aliveNodes) { + Long internalId = ZkIgnitePaths.aliveInternalId(aliveNodePath); + + if (ZkIgnitePaths.aliveNodeClientFlag(aliveNodePath)) + aliveClients.put(internalId, aliveNodePath); + else { + if (srvInternalOrder == null || internalId < srvInternalOrder) { + srvPath = aliveNodePath; + srvInternalOrder = internalId; + } + } + } + + assert !aliveClients.isEmpty(); + + Map.Entry<Long, String> oldest = aliveClients.firstEntry(); + + boolean oldestClient = locInternalOrder == oldest.getKey(); + + if (srvPath == null) { + if (oldestClient) { + Stat stat = new Stat(); + + ZkDiscoveryEventsData prevEvts = rtState.evtsData; + + byte[] evtsBytes = rtState.zkClient.getData(zkPaths.evtsPath, stat); + + assert evtsBytes.length > 0; + + ZkDiscoveryEventsData newEvts = unmarshalZip(evtsBytes); + + if (prevEvts.clusterId.equals(newEvts.clusterId)) { + U.warn(log, "All server nodes failed, notify all clients [locId=" + locNode.id() + ']'); + + generateNoServersEvent(newEvts, stat); + } + else + U.warn(log, "All server nodes failed (received events from new cluster)."); + } + } + else { + String watchPath; + + if (oldestClient) { + watchPath = srvPath; + + if (log.isInfoEnabled()) { + log.info("Servers exists, watch for server node [locId=" + locNode.id() + + ", watchPath=" + watchPath + ']'); + } + } + else { + assert aliveClients.size() > 1 : aliveClients; + + Map.Entry<Long, String> prevE = aliveClients.floorEntry(locInternalOrder - 1); + + assert prevE != null; + + watchPath = prevE.getValue(); + + if (log.isInfoEnabled()) { + log.info("Servers exists, watch for previous node [locId=" + locNode.id() + + ", watchPath=" + watchPath + ']'); + } + } + + PreviousNodeWatcher watcher = new ClientPreviousNodeWatcher(rtState); + + rtState.zkClient.existsAsync(zkPaths.aliveNodesDir + "/" + watchPath, watcher, watcher); + } + } + + /** + * @param evtsData Events data. + * @param evtsStat Events zookeeper state. + * @throws Exception If failed. + */ + private void generateNoServersEvent(ZkDiscoveryEventsData evtsData, Stat evtsStat) throws Exception { + evtsData.evtIdGen++; + + ZkDiscoveryCustomEventData evtData = new ZkDiscoveryCustomEventData( + evtsData.evtIdGen, + 0L, + evtsData.topVer, + locNode.id(), + new ZkNoServersMessage(), + null); + + Collection<ZookeeperClusterNode> nodesToAck = Collections.emptyList(); + + evtsData.addEvent(nodesToAck, evtData); + + byte[] newEvtsBytes = marshalZip(evtsData); + + try { + rtState.zkClient.setData(zkPaths.evtsPath, newEvtsBytes, evtsStat.getVersion()); + } + catch (KeeperException.BadVersionException e) { + // Version can change if new cluster started and saved new events. + if (log.isDebugEnabled()) + log.debug("Failed to save no servers message"); + } + } + + /** + * @param lastEvts Last events from previous coordinator. + * @throws Exception If failed. + */ + private void previousCoordinatorCleanup(ZkDiscoveryEventsData lastEvts) throws Exception { + for (ZkDiscoveryEventData evtData : lastEvts.evts.values()) { + if (evtData instanceof ZkDiscoveryCustomEventData) { + ZkDiscoveryCustomEventData evtData0 = (ZkDiscoveryCustomEventData)evtData; + + // It is possible previous coordinator failed before finished cleanup. + if (evtData0.msg instanceof ZkCommunicationErrorResolveFinishMessage) { + try { + ZkCommunicationErrorResolveFinishMessage msg = + (ZkCommunicationErrorResolveFinishMessage)evtData0.msg; + + ZkCommunicationErrorResolveResult res = unmarshalZip( + ZkDistributedCollectDataFuture.readResult(rtState.zkClient, zkPaths, msg.futId)); + + deleteAliveNodes(res.killedNodes); + } + catch (KeeperException.NoNodeException ignore) { + // No-op. + } + } + else if (evtData0.resolvedMsg instanceof ZkForceNodeFailMessage) + deleteAliveNode(((ZkForceNodeFailMessage)evtData0.resolvedMsg).nodeInternalId); + } + } + } + + /** + * @param aliveNodes Alive nodes paths. + * @throws Exception If failed. + */ + private void onBecomeCoordinator(List<String> aliveNodes) throws Exception { + ZkDiscoveryEventsData prevEvts = processNewEvents(rtState.zkClient.getData(zkPaths.evtsPath)); + + rtState.crd = true; + + if (rtState.joined) { + if (log.isInfoEnabled()) + log.info("Node is new discovery coordinator [locId=" + locNode.id() + ']'); + + assert locNode.order() > 0 : locNode; + assert rtState.evtsData != null; + + previousCoordinatorCleanup(rtState.evtsData); + + UUID futId = rtState.evtsData.communicationErrorResolveFutureId(); + + if (futId != null) { + if (log.isInfoEnabled()) { + log.info("New discovery coordinator will handle already started cluster-wide communication " + + "error resolve [reqId=" + futId + ']'); + } + + ZkCommunicationErrorProcessFuture fut = commErrProcFut.get(); + + ZkDistributedCollectDataFuture collectResFut = collectCommunicationStatusFuture(futId); + + if (fut != null) + fut.nodeResultCollectFuture(collectResFut); + } + + for (ZkDiscoveryEventData evtData : rtState.evtsData.evts.values()) + evtData.initRemainingAcks(rtState.top.nodesByOrder.values()); + + handleProcessedEvents("crd"); + } + else { + String locAlivePath = rtState.locNodeZkPath.substring(rtState.locNodeZkPath.lastIndexOf('/') + 1); + + deleteJoiningNodeData(locNode.id(), + ZkIgnitePaths.aliveNodePrefixId(locAlivePath), + rtState.joinDataPartCnt); + + DiscoverySpiNodeAuthenticator nodeAuth = spi.getAuthenticator(); + + if (nodeAuth != null) { + try { + if (log.isInfoEnabled()) { + log.info("Node is first server node in cluster, try authenticate local node " + + "[locId=" + locNode.id() + ']'); + } + + localAuthentication(nodeAuth, unmarshalCredentials(locNode)); + } + catch (Exception e) { + U.warn(log, "Local node authentication failed: " + e, e); + + onSegmented(e); + + // Stop any further processing. + throw new ZookeeperClientFailedException("Local node authentication failed: " + e); + } + } + + newClusterStarted(prevEvts); + } + + rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, rtState.watcher, rtState.watcher); + rtState.zkClient.getChildrenAsync(zkPaths.customEvtsDir, rtState.watcher, rtState.watcher); + + for (String alivePath : aliveNodes) + watchAliveNodeData(alivePath); + } + + /** + * @param alivePath Node path. + */ + private void watchAliveNodeData(String alivePath) { + assert rtState.locNodeZkPath != null; + + String path = zkPaths.aliveNodesDir + "/" + alivePath; + + if (!path.equals(rtState.locNodeZkPath)) + rtState.zkClient.getDataAsync(path, rtState.aliveNodeDataWatcher, rtState.aliveNodeDataWatcher); + } + + /** + * @param aliveNodes ZK nodes representing alive cluster nodes. + * @throws Exception If failed. + */ + private void generateTopologyEvents(List<String> aliveNodes) throws Exception { + assert rtState.crd; + + if (log.isInfoEnabled()) + log.info("Process alive nodes change [alives=" + aliveNodes.size() + "]"); + + TreeMap<Long, String> alives = new TreeMap<>(); + + for (String child : aliveNodes) { + Long internalId = ZkIgnitePaths.aliveInternalId(child); + + Object old = alives.put(internalId, child); + + assert old == null; + } + + TreeMap<Long, ZookeeperClusterNode> curTop = new TreeMap<>(rtState.top.nodesByOrder); + + int newEvts = 0; + + final int MAX_NEW_EVTS = IgniteSystemProperties.getInteger(IGNITE_ZOOKEEPER_DISCOVERY_SPI_MAX_EVTS, 100); + + List<ZookeeperClusterNode> failedNodes = null; + + for (Map.Entry<Long, ZookeeperClusterNode> e : rtState.top.nodesByInternalId.entrySet()) { + if (!alives.containsKey(e.getKey())) { + ZookeeperClusterNode failedNode = e.getValue(); + + if (failedNodes == null) + failedNodes = new ArrayList<>(); + + failedNodes.add(failedNode); + + generateNodeFail(curTop, failedNode); + + newEvts++; + + if (newEvts == MAX_NEW_EVTS) { + saveAndProcessNewEvents(); + + if (log.isInfoEnabled()) { + log.info("Delay alive nodes change process, max event threshold reached [newEvts=" + newEvts + + ", totalEvts=" + rtState.evtsData.evts.size() + ']'); + } + + handleProcessedEventsOnNodesFail(failedNodes); + + throttleNewEventsGeneration(); + + rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, rtState.watcher, rtState.watcher); + + return; + } + } + } + + // Process failures before processing join, otherwise conflicts are possible in case of fast node stop/re-start. + if (newEvts > 0) { + saveAndProcessNewEvents(); + + handleProcessedEventsOnNodesFail(failedNodes); + + rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, rtState.watcher, rtState.watcher); + + return; + } + + generateJoinEvents(curTop, alives, MAX_NEW_EVTS); + + if (failedNodes != null) + handleProcessedEventsOnNodesFail(failedNodes); + } + + private void generateJoinEvents(TreeMap<Long, ZookeeperClusterNode> curTop, + TreeMap<Long, String> alives, + final int MAX_NEW_EVTS) throws Exception + { + ZkBulkJoinContext joinCtx = new ZkBulkJoinContext(); + + for (Map.Entry<Long, String> e : alives.entrySet()) { + Long internalId = e.getKey(); + + if (!rtState.top.nodesByInternalId.containsKey(internalId)) { + UUID rslvFutId = rtState.evtsData.communicationErrorResolveFutureId(); + + if (rslvFutId != null) { + if (log.isInfoEnabled()) { + log.info("Delay alive nodes change process while communication error resolve " + + "is in progress [reqId=" + rslvFutId + ']'); + } + + break; + } + + processJoinOnCoordinator(joinCtx, curTop, internalId, e.getValue()); + + if (joinCtx.nodes() == MAX_NEW_EVTS) { + generateBulkJoinEvent(curTop, joinCtx); + + if (log.isInfoEnabled()) { + log.info("Delay alive nodes change process, max event threshold reached [" + + "newEvts=" + joinCtx.nodes() + + ", totalEvts=" + rtState.evtsData.evts.size() + ']'); + } + + throttleNewEventsGeneration(); + + rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, rtState.watcher, rtState.watcher); + + return; + } + } + } + + if (joinCtx.nodes() > 0) + generateBulkJoinEvent(curTop, joinCtx); + } + + private void generateBulkJoinEvent(TreeMap<Long, ZookeeperClusterNode> curTop, ZkBulkJoinContext joinCtx) + throws Exception + { + rtState.evtsData.evtIdGen++; + + long evtId = rtState.evtsData.evtIdGen; + + List<T2<ZkJoinedNodeEvtData, Map<Integer, Serializable>>> nodes = joinCtx.nodes; + + assert nodes != null && nodes.size() > 0; + + int nodeCnt = nodes.size(); + + List<ZkJoinedNodeEvtData> joinedNodes = new ArrayList<>(nodeCnt); + + Map<Long, byte[]> discoDataMap = U.newHashMap(nodeCnt); + Map<Long, Long> dupDiscoData = null; + + for (int i = 0; i < nodeCnt; i++) { + T2<ZkJoinedNodeEvtData, Map<Integer, Serializable>> nodeEvtData = nodes.get(i); + + Map<Integer, Serializable> discoData = nodeEvtData.get2(); + + byte[] discoDataBytes = U.marshal(marsh, discoData); + + Long dupDataNode = null; + + for (Map.Entry<Long, byte[]> e : discoDataMap.entrySet()) { + if (Arrays.equals(discoDataBytes, e.getValue())) { + dupDataNode = e.getKey(); + + break; + } + } + + long nodeTopVer = nodeEvtData.get1().topVer; + + if (dupDataNode != null) { + if (dupDiscoData == null) + dupDiscoData = new HashMap<>(); + + Long old = dupDiscoData.put(nodeTopVer, dupDataNode); + + assert old == null : old; + } + else + discoDataMap.put(nodeTopVer, discoDataBytes); + + joinedNodes.add(nodeEvtData.get1()); + } + + int overhead = 5; + + ZkJoinEventDataForJoined dataForJoined = new ZkJoinEventDataForJoined( + new ArrayList<>(curTop.values()), + discoDataMap, + dupDiscoData); + + byte[] dataForJoinedBytes = marshalZip(dataForJoined); + + long addDataStart = System.currentTimeMillis(); + + int dataForJoinedPartCnt = saveData(zkPaths.joinEventDataPathForJoined(evtId), + dataForJoinedBytes, + overhead); + + long addDataTime = System.currentTimeMillis() - addDataStart; + + ZkDiscoveryNodeJoinEventData evtData = new ZkDiscoveryNodeJoinEventData( + evtId, + rtState.evtsData.topVer, + joinedNodes, + dataForJoinedPartCnt); + + rtState.evtsData.addEvent(curTop.values(), evtData); + + if (log.isInfoEnabled()) { + log.info("Generated NODE_JOINED event [" + + "nodeCnt=" + nodeCnt + + ", dataForJoinedSize=" + dataForJoinedBytes.length + + ", dataForJoinedPartCnt=" + dataForJoinedPartCnt + + ", addDataTime=" + addDataTime + + ", evt=" + evtData + ']'); + } + + saveAndProcessNewEvents(); + } + + /** + * + */ + private void throttleNewEventsGeneration() { + long delay = IgniteSystemProperties.getLong(IGNITE_ZOOKEEPER_DISCOVERY_SPI_EVTS_THROTTLE, 0); + + if (delay > 0) { + if (log.isInfoEnabled()) + log.info("Sleep delay before generate new events [delay=" + delay + ']'); + + try { + Thread.sleep(delay); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + /** + * @param nodeId Node ID. + * @param prefixId Path prefix. + * @return Join data. + * @throws Exception If failed. + */ + private ZkJoiningNodeData unmarshalJoinData(UUID nodeId, UUID prefixId) throws Exception { + String joinDataPath = zkPaths.joiningNodeDataPath(nodeId, prefixId); + + byte[] joinData = rtState.zkClient.getData(joinDataPath); + + Object dataObj = unmarshalZip(joinData); + + if (!(dataObj instanceof ZkJoiningNodeData)) + throw new Exception("Invalid joined node data: " + dataObj); + + ZkJoiningNodeData joiningNodeData = (ZkJoiningNodeData)dataObj; + + if (joiningNodeData.partCount() > 1) { + joinData = readMultipleParts(rtState.zkClient, joinDataPath + ":", joiningNodeData.partCount()); + + joiningNodeData = unmarshalZip(joinData); + } + + return joiningNodeData; + } + + /** + * @param nodeId Node ID. + * @param prefixId Path prefix. + * @param aliveNodePath Node path. + * @return Join data. + * @throws Exception If failed. + */ + private Object unmarshalJoinDataOnCoordinator(UUID nodeId, UUID prefixId, String aliveNodePath) throws Exception { + String joinDataPath = zkPaths.joiningNodeDataPath(nodeId, prefixId); + + byte[] joinData = rtState.zkClient.getData(joinDataPath); + + Object dataObj; + + try { + dataObj = unmarshalZip(joinData); + + if (dataObj instanceof ZkInternalJoinErrorMessage) + return dataObj; + } + catch (Exception e) { + U.error(log, "Failed to unmarshal joining node data [nodePath=" + aliveNodePath + "']", e); + + return new ZkInternalJoinErrorMessage(ZkIgnitePaths.aliveInternalId(aliveNodePath), + "Failed to unmarshal join data: " + e); + } + + assert dataObj instanceof ZkJoiningNodeData : dataObj; + + ZkJoiningNodeData joiningNodeData = (ZkJoiningNodeData)dataObj; + + if (joiningNodeData.partCount() > 1) { + joinData = readMultipleParts(rtState.zkClient, joinDataPath + ":", joiningNodeData.partCount()); + + try { + joiningNodeData = unmarshalZip(joinData); + } + catch (Exception e) { + U.error(log, "Failed to unmarshal joining node data [nodePath=" + aliveNodePath + "']", e); + + return new ZkInternalJoinErrorMessage(ZkIgnitePaths.aliveInternalId(aliveNodePath), + "Failed to unmarshal join data: " + e); + } + } + + assert joiningNodeData.node() != null : joiningNodeData; + + return joiningNodeData; + } + + /** + * @param curTop Current nodes. + * @param internalId Joined node internal ID. + * @param aliveNodePath Joined node path. + * @throws Exception If failed. + * @return {@code True} if new join event was added. + */ + private boolean processJoinOnCoordinator( + ZkBulkJoinContext joinCtx, + TreeMap<Long, ZookeeperClusterNode> curTop, + long internalId, + String aliveNodePath) + throws Exception + { + UUID nodeId = ZkIgnitePaths.aliveNodeId(aliveNodePath); + UUID prefixId = ZkIgnitePaths.aliveNodePrefixId(aliveNodePath); + + Object data = unmarshalJoinDataOnCoordinator(nodeId, prefixId, aliveNodePath); + + if (data instanceof ZkJoiningNodeData) { + ZkJoiningNodeData joiningNodeData = (ZkJoiningNodeData)data; + + ZkNodeValidateResult validateRes = validateJoiningNode(joiningNodeData.node()); + + if (validateRes.err == null) { + ZookeeperClusterNode joinedNode = joiningNodeData.node(); + + assert nodeId.equals(joinedNode.id()) : joiningNodeData.node(); + + addJoinedNode( + joinCtx, + curTop, + joiningNodeData, + internalId, + prefixId, + validateRes.secSubjZipBytes); + + watchAliveNodeData(aliveNodePath); + + return true; + } + else { + ZkInternalJoinErrorMessage joinErr = new ZkInternalJoinErrorMessage( + ZkIgnitePaths.aliveInternalId(aliveNodePath), + validateRes.err); + + processJoinError(aliveNodePath, nodeId, prefixId, joinErr); + + return false; + } + } + else { + assert data instanceof ZkInternalJoinErrorMessage : data; + + processJoinError(aliveNodePath, nodeId, prefixId, (ZkInternalJoinErrorMessage)data); + + return false; + } + } + + /** + * @param aliveNodePath Joined node path. + * @param nodeId Node ID. + * @param prefixId Path prefix ID. + * @param joinErr Join error message. + * @throws Exception If failed. + */ + private void processJoinError(String aliveNodePath, + UUID nodeId, + UUID prefixId, + ZkInternalJoinErrorMessage joinErr) throws Exception { + ZookeeperClient client = rtState.zkClient; + + if (joinErr.notifyNode) { + String joinDataPath = zkPaths.joiningNodeDataPath(nodeId, prefixId); + + client.setData(joinDataPath, marshalZip(joinErr), -1); + + client.deleteIfExists(zkPaths.aliveNodesDir + "/" + aliveNodePath, -1); + } + else { + if (log.isInfoEnabled()) + log.info("Ignore join data, node was failed by previous coordinator: " + aliveNodePath); + + client.deleteIfExists(zkPaths.aliveNodesDir + "/" + aliveNodePath, -1); + } + } + + /** + * @param node Joining node. + * @return Validation result. + */ + private ZkNodeValidateResult validateJoiningNode(ZookeeperClusterNode node) { + ZookeeperClusterNode node0 = rtState.top.nodesById.get(node.id()); + + if (node0 != null) { + U.error(log, "Failed to include node in cluster, node with the same ID already exists [joiningNode=" + node + + ", existingNode=" + node0 + ']'); + + // Note: exception message is checked in tests. + return new ZkNodeValidateResult("Node with the same ID already exists: " + node0); + } + + ZkNodeValidateResult res = authenticateNode(node); + + if (res.err != null) + return res; + + IgniteNodeValidationResult err = spi.getSpiContext().validateNode(node); + + if (err != null) { + LT.warn(log, err.message()); + + res.err = err.sendMessage(); + } + + return res; + } + + /** + * @param node Node. + * @return Validation result. + */ + private ZkNodeValidateResult authenticateNode(ZookeeperClusterNode node) { + DiscoverySpiNodeAuthenticator nodeAuth = spi.getAuthenticator(); + + if (nodeAuth == null) + return new ZkNodeValidateResult((byte[])null); + + SecurityCredentials cred; + + try { + cred = unmarshalCredentials(node); + } + catch (Exception e) { + U.error(log, "Failed to unmarshal node credentials: " + e, e); + + return new ZkNodeValidateResult("Failed to unmarshal node credentials"); + } + + SecurityContext subj = nodeAuth.authenticateNode(node, cred); + + if (subj == null) { + U.warn(log, "Authentication failed [nodeId=" + node.id() + + ", addrs=" + U.addressesAsString(node) + ']', + "Authentication failed [nodeId=" + U.id8(node.id()) + ", addrs=" + + U.addressesAsString(node) + ']'); + + // Note: exception message test is checked in tests. + return new ZkNodeValidateResult("Authentication failed"); + } + + if (!(subj instanceof Serializable)) { + U.warn(log, "Authentication subject is not Serializable [nodeId=" + node.id() + + ", addrs=" + U.addressesAsString(node) + ']', + "Authentication subject is not Serializable [nodeId=" + U.id8(node.id()) + + ", addrs=" + + U.addressesAsString(node) + ']'); + + return new ZkNodeValidateResult("Authentication subject is not serializable"); + } + + byte[] secSubjZipBytes; + + try { + secSubjZipBytes = marshalZip(subj); + } + catch (Exception e) { + U.error(log, "Failed to marshal node security subject: " + e, e); + + return new ZkNodeValidateResult("Failed to marshal node security subject"); + } + + return new ZkNodeValidateResult(secSubjZipBytes); + } + + /** + * @throws Exception If failed. + */ + private void saveAndProcessNewEvents() throws Exception { + long start = System.currentTimeMillis(); + + byte[] evtsBytes = marshalZip(rtState.evtsData); + + rtState.zkClient.setData(zkPaths.evtsPath, evtsBytes, -1); + + long time = System.currentTimeMillis() - start; + + if (log.isInfoEnabled()) { + log.info("Discovery coordinator saved new topology events [topVer=" + rtState.evtsData.topVer + + ", size=" + evtsBytes.length + + ", evts=" + rtState.evtsData.evts.size() + + ", lastEvt=" + rtState.evtsData.evtIdGen + + ", saveTime=" + time + ']'); + } + + processNewEvents(rtState.evtsData); + } + + /** + * @param curTop Current topology. + * @param failedNode Failed node. + */ + private void generateNodeFail(TreeMap<Long, ZookeeperClusterNode> curTop, ZookeeperClusterNode failedNode) { + Object rmvd = curTop.remove(failedNode.order()); + + assert rmvd != null; + + rtState.evtsData.topVer++; + rtState.evtsData.evtIdGen++; + + ZkDiscoveryNodeFailEventData evtData = new ZkDiscoveryNodeFailEventData( + rtState.evtsData.evtIdGen, + rtState.evtsData.topVer, + failedNode.internalId()); + + rtState.evtsData.addEvent(curTop.values(), evtData); + + if (log.isInfoEnabled()) + log.info("Generated NODE_FAILED event [evt=" + evtData + ']'); + } + + /** + * @param curTop Current nodes. + * @param joiningNodeData Join data. + * @param internalId Joined node internal ID. + * @param prefixId Unique path prefix. + * @param secSubjZipBytes Marshalled security subject. + * @throws Exception If failed. + */ + private void addJoinedNode( + ZkBulkJoinContext joinCtx, + TreeMap<Long, ZookeeperClusterNode> curTop, + ZkJoiningNodeData joiningNodeData, + long internalId, + UUID prefixId, + @Nullable byte[] secSubjZipBytes) + throws Exception + { + ZookeeperClusterNode joinedNode = joiningNodeData.node(); + + UUID nodeId = joinedNode.id(); + + rtState.evtsData.topVer++; + + joinedNode.order(rtState.evtsData.topVer); + joinedNode.internalId(internalId); + + DiscoveryDataBag joiningNodeBag = new DiscoveryDataBag(nodeId); + + joiningNodeBag.joiningNodeData(joiningNodeData.discoveryData()); + + exchange.onExchange(joiningNodeBag); + + DiscoveryDataBag collectBag = new DiscoveryDataBag(nodeId, new HashSet<Integer>()); + + exchange.collect(collectBag); + + Map<Integer, Serializable> commonData = collectBag.commonData(); + + Object old = curTop.put(joinedNode.order(), joinedNode); + + assert old == null; + + int overhead = 5; + + int secSubjPartCnt = 0; + + if (secSubjZipBytes != null) { + secSubjPartCnt = saveData(zkPaths.joinEventSecuritySubjectPath(joinedNode.order()), + secSubjZipBytes, + overhead); + + assert secSubjPartCnt > 0 : secSubjPartCnt; + + setNodeSecuritySubject(joinedNode, secSubjZipBytes); + } + + ZkJoinedNodeEvtData nodeEvtData = new ZkJoinedNodeEvtData( + rtState.evtsData.topVer, + joinedNode.id(), + joinedNode.internalId(), + prefixId, + joiningNodeData.partCount(), + secSubjPartCnt); + + nodeEvtData.joiningNodeData = joiningNodeData; + + joinCtx.addJoinedNode(nodeEvtData, commonData); + + rtState.evtsData.onNodeJoin(joinedNode); + } + + /** + * @param path Path to save. + * @param bytes Bytes to save. + * @param overhead Extra overhead. + * @return Parts count. + * @throws Exception If failed. + */ + private int saveData(String path, byte[] bytes, int overhead) throws Exception { + int dataForJoinedPartCnt = 1; + + if (rtState.zkClient.needSplitNodeData(path, bytes, overhead)) { + dataForJoinedPartCnt = saveMultipleParts(rtState.zkClient, + path, + rtState.zkClient.splitNodeData(path, bytes, overhead)); + } + else { + rtState.zkClient.createIfNeeded(multipartPathName(path, 0), + bytes, + PERSISTENT); + } + + return dataForJoinedPartCnt; + } + + /** + * @param prevEvts Events from previous cluster. + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + private void newClusterStarted(@Nullable ZkDiscoveryEventsData prevEvts) throws Exception { + assert !locNode.isClient() : locNode; + + long locInternalId = rtState.internalOrder; + + assert prevEvts == null || prevEvts.maxInternalOrder < locInternalId; + + spi.getSpiContext().removeTimeoutObject(rtState.joinErrTo); + + cleanupPreviousClusterData(prevEvts != null ? prevEvts.maxInternalOrder + 1 : -1L); + + rtState.joined = true; + rtState.gridStartTime = System.currentTimeMillis(); + + rtState.evtsData = ZkDiscoveryEventsData.createForNewCluster(rtState.gridStartTime); + + if (log.isInfoEnabled()) { + log.info("New cluster started [locId=" + locNode.id() + + ", clusterId=" + rtState.evtsData.clusterId + + ", startTime=" + rtState.evtsData.clusterStartTime + ']'); + } + + locNode.internalId(locInternalId); + locNode.order(1); + + rtState.evtsData.onNodeJoin(locNode); + + rtState.top.addNode(locNode); + + final List<ClusterNode> topSnapshot = Collections.singletonList((ClusterNode)locNode); + + lsnr.onDiscovery(EVT_NODE_JOINED, + 1L, + locNode, + topSnapshot, + Collections.<Long, Collection<ClusterNode>>emptyMap(), + null); + + // Reset events (this is also notification for clients left from previous cluster). + rtState.zkClient.setData(zkPaths.evtsPath, marshalZip(rtState.evtsData), -1); + + joinFut.onDone(); + } + + /** + * @param startInternalOrder Starting internal order for cluster (znodes having lower order belong + * to clients from previous cluster and should be removed). + + * @throws Exception If failed. + */ + private void cleanupPreviousClusterData(long startInternalOrder) throws Exception { + long start = System.currentTimeMillis(); + + ZookeeperClient client = rtState.zkClient; + + // TODO ZK: use multi, better batching + max-size safe + NoNodeException safe. + List<String> evtChildren = rtState.zkClient.getChildren(zkPaths.evtsPath); + + for (String evtPath : evtChildren) { + String evtDir = zkPaths.evtsPath + "/" + evtPath; + + removeChildren(evtDir); + } + + client.deleteAll(zkPaths.evtsPath, evtChildren, -1); + + client.deleteAll(zkPaths.customEvtsDir, + client.getChildren(zkPaths.customEvtsDir), + -1); + + rtState.zkClient.deleteAll(zkPaths.customEvtsPartsDir, + rtState.zkClient.getChildren(zkPaths.customEvtsPartsDir), + -1); + + rtState.zkClient.deleteAll(zkPaths.customEvtsAcksDir, + rtState.zkClient.getChildren(zkPaths.customEvtsAcksDir), + -1); + + if (startInternalOrder > 0) { + for (String alive : rtState.zkClient.getChildren(zkPaths.aliveNodesDir)) { + if (ZkIgnitePaths.aliveInternalId(alive) < startInternalOrder) + rtState.zkClient.deleteIfExists(zkPaths.aliveNodesDir + "/" + alive, -1); + } + } + + long time = System.currentTimeMillis() - start; + + if (time > 0) { + if (log.isInfoEnabled()) + log.info("Previous cluster data cleanup time: " + time); + } + } + + /** + * @param path Path. + * @throws Exception If failed. + */ + private void removeChildren(String path) throws Exception { + rtState.zkClient.deleteAll(path, rtState.zkClient.getChildren(path), -1); + } + + /** + * @param zkClient Client. + * @param evtPath Event path. + * @param sndNodeId Sender node ID. + * @return Event data. + * @throws Exception If failed. + */ + private byte[] readCustomEventData(ZookeeperClient zkClient, String evtPath, UUID sndNodeId) throws Exception { + int partCnt = ZkIgnitePaths.customEventPartsCount(evtPath); + + if (partCnt > 1) { + String partsBasePath = zkPaths.customEventPartsBasePath( + ZkIgnitePaths.customEventPrefix(evtPath), sndNodeId); + + return readMultipleParts(zkClient, partsBasePath, partCnt); + } + else + return zkClient.getData(zkPaths.customEvtsDir + "/" + evtPath); + } + + /** + * @param customEvtNodes ZK nodes representing custom events to process. + * @throws Exception If failed. + */ + private void generateCustomEvents(List<String> customEvtNodes) throws Exception { + assert rtState.crd; + + ZookeeperClient zkClient = rtState.zkClient; + ZkDiscoveryEventsData evtsData = rtState.evtsData; + + TreeMap<Integer, String> unprocessedEvts = null; + + for (int i = 0; i < customEvtNodes.size(); i++) { + String evtPath = customEvtNodes.get(i); + + int evtSeq = ZkIgnitePaths.customEventSequence(evtPath); + + if (evtSeq > evtsData.procCustEvt) { + if (unprocessedEvts == null) + unprocessedEvts = new TreeMap<>(); + + unprocessedEvts.put(evtSeq, evtPath); + } + } + + if (unprocessedEvts == null) + return; + + for (Map.Entry<Integer, String> evtE : unprocessedEvts.entrySet()) { + evtsData.procCustEvt = evtE.getKey(); + + String evtPath = evtE.getValue(); + + UUID sndNodeId = ZkIgnitePaths.customEventSendNodeId(evtPath); + + ZookeeperClusterNode sndNode = rtState.top.nodesById.get(sndNodeId); + + if (sndNode != null) { + byte[] evtBytes = readCustomEventData(zkClient, evtPath, sndNodeId); + + DiscoverySpiCustomMessage msg; + + try { + msg = unmarshalZip(evtBytes); + } + catch (Exception e) { + U.error(log, "Failed to unmarshal custom discovery message: " + e, e); + + deleteCustomEventDataAsync(rtState.zkClient, evtPath); + + continue; + } + + generateAndProcessCustomEventOnCoordinator(evtPath, sndNode, msg); + } + else { + U.warn(log, "Ignore custom event from unknown node: " + sndNodeId); + + deleteCustomEventDataAsync(rtState.zkClient, evtPath); + } + } + } + + /** + * @param evtPath Event data path. + * @param sndNode Sender node. + * @param msg Message instance. + * @throws Exception If failed. + */ + private void generateAndProcessCustomEventOnCoordinator(String evtPath, + ZookeeperClusterNode sndNode, + DiscoverySpiCustomMessage msg) throws Exception + { + ZookeeperClient zkClient = rtState.zkClient; + ZkDiscoveryEventsData evtsData = rtState.evtsData; + + ZookeeperClusterNode failedNode = null; + + if (msg instanceof ZkForceNodeFailMessage) { + ZkForceNodeFailMessage msg0 = (ZkForceNodeFailMessage)msg; + + failedNode = rtState.top.nodesByInternalId.get(msg0.nodeInternalId); + + if (failedNode != null) + evtsData.topVer++; + else { + if (log.isDebugEnabled()) + log.debug("Ignore forcible node fail request for unknown node: " + msg0.nodeInternalId); + + deleteCustomEventDataAsync(zkClient, evtPath); + + return; + } + } + else if (msg instanceof ZkCommunicationErrorResolveStartMessage) { + ZkCommunicationErrorResolveStartMessage msg0 = + (ZkCommunicationErrorResolveStartMessage)msg; + + if (evtsData.communicationErrorResolveFutureId() != null) { + if (log.isInfoEnabled()) { + log.info("Ignore communication error resolve message, resolve process " + + "already started [sndNode=" + sndNode + ']'); + } + + deleteCustomEventDataAsync(zkClient, evtPath); + + return; + } + else { + if (log.isInfoEnabled()) { + log.info("Start cluster-wide communication error resolve [sndNode=" + sndNode + + ", reqId=" + msg0.id + + ", topVer=" + evtsData.topVer + ']'); + } + + zkClient.createIfNeeded(zkPaths.distributedFutureBasePath(msg0.id), + null, + PERSISTENT); + + evtsData.communicationErrorResolveFutureId(msg0.id); + } + } + + evtsData.evtIdGen++; + + ZkDiscoveryCustomEventData evtData = new ZkDiscoveryCustomEventData( + evtsData.evtIdGen, + 0L, + evtsData.topVer, + sndNode.id(), + null, + evtPath); + + evtData.resolvedMsg = msg; + + if (log.isDebugEnabled()) + log.debug("Generated CUSTOM event [evt=" + evtData + ", msg=" + msg + ']'); + + boolean fastStopProcess = false; + + if (msg instanceof ZkInternalMessage) + processInternalMessage(evtData, (ZkInternalMessage)msg); + else { + notifyCustomEvent(evtData, msg); + + if (msg.stopProcess()) { + if (log.isDebugEnabled()) + log.debug("Fast stop process custom event [evt=" + evtData + ", msg=" + msg + ']'); + + fastStopProcess = true; + + // No need to process this event on others nodes, skip this event. + evtsData.evts.remove(evtData.eventId()); + + evtsData.evtIdGen--; + + DiscoverySpiCustomMessage ack = msg.ackMessage(); + + if (ack != null) { + evtData = createAckEvent(ack, evtData); + + if (log.isDebugEnabled()) + log.debug("Generated CUSTOM event (ack for fast stop process) [evt=" + evtData + ", msg=" + msg + ']'); + + notifyCustomEvent(evtData, ack); + } + else + evtData = null; + } + } + + if (evtData != null) { + evtsData.addEvent(rtState.top.nodesByOrder.values(), evtData); + + rtState.locNodeInfo.lastProcEvt = evtData.eventId(); + + saveAndProcessNewEvents(); + + if (fastStopProcess) + deleteCustomEventDataAsync(zkClient, evtPath); + + if (failedNode != null) { + deleteAliveNode(failedNode.internalId()); + + handleProcessedEventsOnNodesFail(Collections.singletonList(failedNode)); + } + } + } + + /** + * @param internalId Node internal ID. + * @throws Exception If failed. + */ + private void deleteAliveNode(long internalId) throws Exception { + for (String child : rtState.zkClient.getChildren(zkPaths.aliveNodesDir)) { + if (ZkIgnitePaths.aliveInternalId(child) == internalId) { + rtState.zkClient.deleteIfExistsAsync(zkPaths.aliveNodesDir + "/" + child); + + break; + } + } + } + + /** + * @param zkClient Client. + * @param evtPath Event path. + */ + private void deleteCustomEventDataAsync(ZookeeperClient zkClient, String evtPath) { + if (log.isDebugEnabled()) + log.debug("Delete custom event data: " + evtPath); + + String prefix = ZkIgnitePaths.customEventPrefix(evtPath); + UUID sndNodeId = ZkIgnitePaths.customEventSendNodeId(evtPath); + int partCnt = ZkIgnitePaths.customEventPartsCount(evtPath); + + assert partCnt >= 1 : partCnt; + + if (partCnt > 1) { + for (int i = 0; i < partCnt; i++) { + String path0 = zkPaths.customEventPartPath(prefix, sndNodeId, i); + + zkClient.deleteIfExistsAsync(path0); + } + } + + zkClient.deleteIfExistsAsync(zkPaths.customEvtsDir + "/" + evtPath); + } + + /** + * @param data Marshalled events. + * @throws Exception If failed. + * @return Events. + */ + @Nullable private ZkDiscoveryEventsData processNewEvents(byte[] data) throws Exception { + ZkDiscoveryEventsData newEvts = data.length > 0 ? (ZkDiscoveryEventsData)unmarshalZip(data) : null; + + if (rtState.joined && (newEvts == null || !rtState.evtsData.clusterId.equals(newEvts.clusterId))) { + assert locNode.isClient() : locNode; + + throw localNodeFail("All server nodes failed, client node disconnected (received events from new custer) " + + "[locId=" + locNode.id() + ']', true); + } + + if (newEvts == null) + return null; + + assert !rtState.crd; + + // Need keep processed custom events since they contain message object which is needed to create ack. + if (rtState.evtsData != null) { + for (Map.Entry<Long, ZkDiscoveryEventData> e : rtState.evtsData.evts.entrySet()) { + ZkDiscoveryEventData evtData = e.getValue(); + + if (evtData.eventType() == ZkDiscoveryEventData.ZK_EVT_CUSTOM_EVT) { + ZkDiscoveryCustomEventData evtData0 = + (ZkDiscoveryCustomEventData)newEvts.evts.get(evtData.eventId()); + + if (evtData0 != null) + evtData0.resolvedMsg = ((ZkDiscoveryCustomEventData)evtData).resolvedMsg; + } + } + } + + processNewEvents(newEvts); + + if (rtState.joined) + rtState.evtsData = newEvts; + + return newEvts; + } + + /** + * @param evtsData Events. + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + private void processNewEvents(final ZkDiscoveryEventsData evtsData) throws Exception { + TreeMap<Long, ZkDiscoveryEventData> evts = evtsData.evts; + + ZookeeperClient zkClient = rtState.zkClient; + + boolean evtProcessed = false; + boolean updateNodeInfo = false; + + try { + for (ZkDiscoveryEventData evtData : evts.tailMap(rtState.locNodeInfo.lastProcEvt, false).values()) { + if (log.isDebugEnabled()) + log.debug("New discovery event data [evt=" + evtData + ", evtsHist=" + evts.size() + ']'); + + switch (evtData.eventType()) { + case ZkDiscoveryEventData.ZK_EVT_NODE_JOIN: { + evtProcessed = processBulkJoin(evtsData, (ZkDiscoveryNodeJoinEventData)evtData); + + break; + } + + case ZkDiscoveryEventData.ZK_EVT_NODE_FAILED: { + if (!rtState.joined) + break; + + evtProcessed = true; + + notifyNodeFail((ZkDiscoveryNodeFailEventData)evtData); + + break; + } + + case ZkDiscoveryEventData.ZK_EVT_CUSTOM_EVT: { + if (!rtState.joined) + break; + + evtProcessed = true; + + ZkDiscoveryCustomEventData evtData0 = (ZkDiscoveryCustomEventData)evtData; + + if (evtData0.ackEvent() && evtData0.topologyVersion() < locNode.order()) + break; + + DiscoverySpiCustomMessage msg; + + if (rtState.crd) { + assert evtData0.resolvedMsg != null : evtData0; + +
<TRUNCATED>