zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0bddcdef Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0bddcdef Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0bddcdef Branch: refs/heads/ignite-zk Commit: 0bddcdef32646c407e879fbe39765f877928d44c Parents: 74526d1 Author: sboikov <[email protected]> Authored: Mon Dec 4 18:09:43 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon Dec 4 18:09:43 2017 +0300 ---------------------------------------------------------------------- .../zk/internal/ZkInternalFailNodeMessage.java | 55 -------- .../ZkInternalForceNodeFailMessage.java | 55 ++++++++ .../zk/internal/ZkInternalJoinErrorMessage.java | 43 +++++++ .../zk/internal/ZookeeperDiscoveryImpl.java | 129 +++++++++++++------ .../ZookeeperDiscoverySpiBasicTest.java | 17 +++ 5 files changed, 204 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0bddcdef/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalFailNodeMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalFailNodeMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalFailNodeMessage.java deleted file mode 100644 index a97289d..0000000 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalFailNodeMessage.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spi.discovery.zk.internal; - -import java.util.UUID; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; -import org.jetbrains.annotations.Nullable; - -/** - * - */ -public class ZkInternalFailNodeMessage implements ZkInternalMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - final UUID nodeId; - - /** */ - final String warning; - - /** - * @param nodeId Node ID. - * @param warning Warning to be displayed on all nodes. - */ - ZkInternalFailNodeMessage(UUID nodeId, String warning) { - this.nodeId = nodeId; - this.warning = warning; - } - - /** {@inheritDoc} */ - @Nullable @Override public DiscoverySpiCustomMessage ackMessage() { - return null; - } - - /** {@inheritDoc} */ - @Override public boolean isMutable() { - return false; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/0bddcdef/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalForceNodeFailMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalForceNodeFailMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalForceNodeFailMessage.java new file mode 100644 index 0000000..fafcafc --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalForceNodeFailMessage.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.util.UUID; +import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class ZkInternalForceNodeFailMessage implements ZkInternalMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + final UUID nodeId; + + /** */ + final String warning; + + /** + * @param nodeId Node ID. + * @param warning Warning to be displayed on all nodes. + */ + ZkInternalForceNodeFailMessage(UUID nodeId, String warning) { + this.nodeId = nodeId; + this.warning = warning; + } + + /** {@inheritDoc} */ + @Nullable @Override public DiscoverySpiCustomMessage ackMessage() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean isMutable() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0bddcdef/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java new file mode 100644 index 0000000..7e06858 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.io.Serializable; + +/** + * + */ +class ZkInternalJoinErrorMessage implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final int nodeInternalId; + + /** */ + private final String err; + + /** + * @param nodeInternalId Joining node internal ID. + * @param err Error message. + */ + ZkInternalJoinErrorMessage(int nodeInternalId, String err) { + this.nodeInternalId = nodeInternalId; + this.err = err; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0bddcdef/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 796310f..ef67ec4 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -243,7 +243,7 @@ public class ZookeeperDiscoveryImpl { return; } - sendCustomMessage(new ZkInternalFailNodeMessage(nodeId, warning)); + sendCustomMessage(new ZkInternalForceNodeFailMessage(nodeId, warning)); } /** @@ -618,27 +618,19 @@ public class ZookeeperDiscoveryImpl { TreeMap<Integer, String> alives = new TreeMap<>(); - Integer locInternalId = null; + int locInternalId = ZkIgnitePaths.aliveInternalId(state.locNodeZkPath); for (String aliveNodePath : aliveNodes) { Integer internalId = ZkIgnitePaths.aliveInternalId(aliveNodePath); alives.put(internalId, aliveNodePath); - - if (locInternalId == null) { - UUID nodeId = ZkIgnitePaths.aliveNodeId(aliveNodePath); - - if (locNode.id().equals(nodeId)) - locInternalId = internalId; - } } assert !alives.isEmpty(); - assert locInternalId != null; Map.Entry<Integer, String> crdE = alives.firstEntry(); - if (locInternalId.equals(crdE.getKey())) + if (locInternalId == crdE.getKey()) onBecomeCoordinator(aliveNodes, locInternalId); else { assert alives.size() > 1; @@ -746,9 +738,7 @@ public class ZookeeperDiscoveryImpl { assert old == null; if (!state.top.nodesByInternalId.containsKey(internalId)) { - generateNodeJoin(curTop, internalId, child); - - watchAliveNodeData(child); + processJoinOnCoordinator(curTop, internalId, child); newEvts = true; } @@ -779,6 +769,81 @@ public class ZookeeperDiscoveryImpl { } /** + * @param curTop Current nodes. + * @param internalId Joined node internal ID. + * @param aliveNodePath Joined node path. + * @throws Exception If failed. + */ + private void processJoinOnCoordinator(TreeMap<Long, ZookeeperClusterNode> curTop, + int internalId, + String aliveNodePath) throws Exception { + UUID nodeId = ZkIgnitePaths.aliveNodeId(aliveNodePath); + + String joinDataPath = zkPaths.joiningNodeDataPath(nodeId, aliveNodePath); + byte[] joinData; + + try { + joinData = state.zkClient.getData(joinDataPath); + } + catch (KeeperException.NoNodeException e) { + U.warn(log, "Failed to read joining node data, node left before join process finished: " + nodeId); + + return; + } + + String err = null; + + ZkJoiningNodeData joiningNodeData = null; + + try { + joiningNodeData = unmarshalZip(joinData); + } + catch (Exception e) { + U.error(log, "Failed to unmarshal joining node data [nodePath=" + aliveNodePath + "']", e); + + err = "Failed to unmarshal join data: " + e; + } + + if (err == null) { + assert joiningNodeData != null; + + err = validateJoiningNode(joiningNodeData.node()); + } + + if (err == null) { + ZookeeperClusterNode joinedNode = joiningNodeData.node(); + + assert nodeId.equals(joinedNode.id()) : joiningNodeData.node(); + + generateNodeJoin(curTop, joinData, joiningNodeData, internalId); + + watchAliveNodeData(aliveNodePath); + } + else { + ZkInternalJoinErrorMessage msg = new ZkInternalJoinErrorMessage(internalId, err); + + // IgniteNodeValidationResult err = spi.getSpiContext().validateNode(node); + } + } + + /** + * @param node Joining node. + * @return + */ + @Nullable private String validateJoiningNode(ZookeeperClusterNode node) { + ZookeeperClusterNode node0 = state.top.nodesById.get(node.id()); + + if (node0 != null) { + U.error(log, "Failed to include node in cluster, node with the same ID already exists [joiningNode=" + node + + ", existingNode=" + node0 + ']'); + + return "Node with the same ID already exists"; + } + + return null; + } + + /** * @throws Exception If failed. */ private void saveAndProcessNewEvents() throws Exception { @@ -827,34 +892,18 @@ public class ZookeeperDiscoveryImpl { /** * @param curTop Current nodes. * @param internalId Joined node internal ID. - * @param aliveNodePath Joined node path. * @throws Exception If failed. */ - private void generateNodeJoin(TreeMap<Long, ZookeeperClusterNode> curTop, - int internalId, - String aliveNodePath) + private void generateNodeJoin( + TreeMap<Long, ZookeeperClusterNode> curTop, + byte[] joinData, + ZkJoiningNodeData joiningNodeData, + int internalId) throws Exception { - UUID nodeId = ZkIgnitePaths.aliveNodeId(aliveNodePath); - - String joinDataPath = zkPaths.joiningNodeDataPath(nodeId, aliveNodePath); - byte[] joinData; - - try { - joinData = state.zkClient.getData(joinDataPath); - } - catch (KeeperException.NoNodeException e) { - U.warn(log, "Failed to read joining node data, node left before join process finished: " + nodeId); - - return; - } - - // TODO ZK: fail node if can not unmarshal. - ZkJoiningNodeData joiningNodeData = unmarshalZip(joinData); - ZookeeperClusterNode joinedNode = joiningNodeData.node(); - assert nodeId.equals(joinedNode.id()) : joiningNodeData.node(); + UUID nodeId = joinedNode.id(); state.evtsData.topVer++; state.evtsData.evtIdGen++; @@ -1061,8 +1110,8 @@ public class ZookeeperDiscoveryImpl { state.evtsData.evtIdGen++; - if (msg instanceof ZkInternalFailNodeMessage) { - ZkInternalFailNodeMessage msg0 = (ZkInternalFailNodeMessage)msg; + if (msg instanceof ZkInternalForceNodeFailMessage) { + ZkInternalForceNodeFailMessage msg0 = (ZkInternalForceNodeFailMessage)msg; if (alives == null) alives = new HashSet<>(state.top.nodesById.keySet()); @@ -1358,8 +1407,8 @@ public class ZookeeperDiscoveryImpl { * @param msg */ private void processInternalMessage(ZkDiscoveryCustomEventData evtData, ZkInternalMessage msg) throws Exception { - if (msg instanceof ZkInternalFailNodeMessage) { - ZkInternalFailNodeMessage msg0 = (ZkInternalFailNodeMessage)msg; + if (msg instanceof ZkInternalForceNodeFailMessage) { + ZkInternalForceNodeFailMessage msg0 = (ZkInternalForceNodeFailMessage)msg; ClusterNode creatorNode = state.top.nodesById.get(evtData.sndNodeId); http://git-wip-us.apache.org/repos/asf/ignite/blob/0bddcdef/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java index 875d264..2c6890f 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java @@ -120,6 +120,9 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { /** */ private boolean dfltConsistenId; + /** */ + private UUID nodeId; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { if (testSockNio) @@ -127,6 +130,9 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + if (nodeId != null) + cfg.setNodeId(nodeId); + if (!dfltConsistenId) cfg.setConsistentId(igniteInstanceName); @@ -1397,6 +1403,17 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testDuplicatedNodeId() throws Exception { + nodeId = UUID.randomUUID(); + + startGrid(0); + + startGrid(1); + } + + /** * @param clients Clients. * @param c Closure to run. * @throws Exception If failed.
