zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/376a4845 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/376a4845 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/376a4845 Branch: refs/heads/ignite-zk Commit: 376a484569db8d1a0eb4b43128d7e327defb5e3d Parents: b7cbd4c Author: sboikov <sboi...@gridgain.com> Authored: Wed Dec 13 16:00:34 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Dec 13 17:52:03 2017 +0300 ---------------------------------------------------------------------- .../zk/internal/ZkAbstractCallabck.java | 66 +++ .../zk/internal/ZkAbstractChildrenCallback.java | 53 +++ .../zk/internal/ZkAbstractWatcher.java | 55 +++ .../zk/internal/ZkCollectDistributedFuture.java | 171 ++++++++ .../ZkCommunicationErrorProcessFuture.java | 225 ++++++++--- ...kCommunicationErrorResolveFinishMessage.java | 38 ++ ...ZkCommunicationErrorResolveStartMessage.java | 50 +++ .../zk/internal/ZkDiscoveryEventsData.java | 18 + .../discovery/zk/internal/ZkEventAckFuture.java | 142 ------- .../zk/internal/ZkForceNodeFailMessage.java | 55 +++ .../discovery/zk/internal/ZkIgnitePaths.java | 8 + .../ZkInternalCommunicationErrorMessage.java | 39 -- .../ZkInternalForceNodeFailMessage.java | 55 --- .../spi/discovery/zk/internal/ZkRunnable.java | 51 +++ .../zk/internal/ZookeeperDiscoveryImpl.java | 397 +++++++++++-------- 15 files changed, 981 insertions(+), 442 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/376a4845/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java new file mode 100644 index 0000000..d2efb9f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import org.apache.ignite.internal.util.GridSpinBusyLock; + +/** + * + */ +abstract class ZkAbstractCallabck { + /** */ + final ZkRuntimeState rtState; + + /** */ + private final ZookeeperDiscoveryImpl impl; + + /** */ + private final GridSpinBusyLock busyLock; + + /** + * @param rtState Runtime state. + * @param impl Discovery impl. + */ + ZkAbstractCallabck(ZkRuntimeState rtState, ZookeeperDiscoveryImpl impl) { + this.rtState = rtState; + this.impl = impl; + + busyLock = impl.busyLock; + } + + /** + * @return {@code True} if is able to start processing. + */ + final boolean onProcessStart() { + return !rtState.closing && busyLock.enterBusy(); + } + + /** + * + */ + final void onProcessEnd() { + busyLock.leaveBusy(); + } + + /** + * @param e Error. + */ + final void onProcessError(Throwable e) { + impl.onFatalError(busyLock, e); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/376a4845/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java new file mode 100644 index 0000000..5679993 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.util.List; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.data.Stat; + +/** + * + */ +abstract class ZkAbstractChildrenCallback extends ZkAbstractCallabck implements AsyncCallback.Children2Callback { + /** + * @param rtState Runtime state. + * @param impl Discovery impl. + */ + ZkAbstractChildrenCallback(ZkRuntimeState rtState, ZookeeperDiscoveryImpl impl) { + super(rtState, impl); + } + + /** {@inheritDoc} */ + @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { + if (!onProcessStart()) + return; + + try { + processResult0(rc, path, ctx, children, stat); + + onProcessEnd(); + } + catch (Throwable e) { + onProcessError(e); + } + } + + abstract void processResult0(int rc, String path, Object ctx, List<String> children, Stat stat) + throws Exception; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/376a4845/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractWatcher.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractWatcher.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractWatcher.java new file mode 100644 index 0000000..9098d05 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractWatcher.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; + +/** + * + */ +abstract class ZkAbstractWatcher extends ZkAbstractCallabck implements Watcher { + /** + * @param rtState Runtime state. + * @param impl Discovery impl. + */ + ZkAbstractWatcher(ZkRuntimeState rtState, ZookeeperDiscoveryImpl impl) { + super(rtState, impl); + } + + /** {@inheritDoc} */ + @Override public final void process(WatchedEvent evt) { + if (!onProcessStart()) + return; + + try { + process0(evt); + + onProcessEnd(); + } + catch (Throwable e) { + onProcessError(e); + } + } + + /** + * @param evt Event. + * @throws Exception If failed. + */ + protected abstract void process0(WatchedEvent evt) throws Exception; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/376a4845/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCollectDistributedFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCollectDistributedFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCollectDistributedFuture.java new file mode 100644 index 0000000..fa529cf --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCollectDistributedFuture.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.data.Stat; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +class ZkCollectDistributedFuture extends GridFutureAdapter<Void> { + /** */ + private final IgniteLogger log; + + /** */ + private final String futPath; + + /** */ + private final ZookeeperDiscoveryImpl impl; + + /** */ + private final Set<Long> remainingNodes; + + /** */ + private final Callable<Void> lsnr; + + /** + * @param impl + * @param rtState + * @param futPath + */ + ZkCollectDistributedFuture(ZookeeperDiscoveryImpl impl, ZkRuntimeState rtState, String futPath, Callable<Void> lsnr) throws Exception { + this.impl = impl; + this.log = impl.log(); + this.futPath = futPath; + this.lsnr = lsnr; + + ZkClusterNodes top = impl.nodes(); + + remainingNodes = U.newHashSet(top.nodesByOrder.size()); + + for (ZookeeperClusterNode node : top.nodesByInternalId.values()) + remainingNodes.add(node.order()); + + NodeResultsWatcher watcher = new NodeResultsWatcher(rtState, impl); + + if (remainingNodes.isEmpty()) + completeAndNotifyListener(); + else + rtState.zkClient.getChildrenAsync(futPath, watcher, watcher); + } + + /** + * @throws Exception If listener call failed. + */ + private void completeAndNotifyListener() throws Exception { + if (super.onDone()) + lsnr.call(); + } + + /** + * @param futPath + * @param client + * @param nodeOrder + * @param data + * @throws Exception If failed. + */ + static void saveNodeResult(String futPath, ZookeeperClient client, long nodeOrder, byte[] data) throws Exception { + client.createIfNeeded(futPath + "/" + nodeOrder, data, CreateMode.PERSISTENT); + } + + /** + * @param node Failed node. + */ + void onNodeFail(ZookeeperClusterNode node) throws Exception { + long nodeOrder = node.order(); + + if (remainingNodes.remove(nodeOrder)) { + int remaining = remainingNodes.size(); + + if (log.isInfoEnabled()) { + log.info("ZkCollectDistributedFuture removed remaining failed node [node=" + nodeOrder + + ", remaining=" + remaining + + ", futPath=" + futPath + ']'); + } + + if (remaining == 0) + completeAndNotifyListener(); + } + } + + /** + * + */ + class NodeResultsWatcher extends ZkAbstractWatcher implements AsyncCallback.Children2Callback { + /** + * @param rtState Runtime state. + * @param impl Discovery impl. + */ + NodeResultsWatcher(ZkRuntimeState rtState, ZookeeperDiscoveryImpl impl) { + super(rtState, impl); + } + + /** {@inheritDoc} */ + @Override protected void process0(WatchedEvent evt) { + if (evt.getType() == Watcher.Event.EventType.NodeChildrenChanged) + impl.zkClient().getChildrenAsync(evt.getPath(), this, this); + } + + /** {@inheritDoc} */ + @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { + if (!onProcessStart()) + return; + + try { + assert rc == 0 : KeeperException.Code.get(rc); + + if (isDone()) + return; + + for (int i = 0; i < children.size(); i++) { + Long nodeOrder = Long.parseLong(children.get(i)); + + if (remainingNodes.remove(nodeOrder)) { + int remaining = remainingNodes.size(); + + if (log.isInfoEnabled()) { + log.info("ZkCollectDistributedFuture added new result [node=" + nodeOrder + + ", remaining=" + remaining + + ", futPath=" + path + ']'); + } + + if (remaining == 0) + completeAndNotifyListener(); + } + } + + onProcessEnd(); + } + catch (Throwable e) { + onProcessError(e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/376a4845/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java index 2ea65e8..d87f500 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java @@ -17,24 +17,31 @@ package org.apache.ignite.spi.discovery.zk.internal; -import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; +import java.util.concurrent.Callable; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.spi.IgniteSpiTimeoutObject; +import org.jetbrains.annotations.Nullable; /** * */ -class ZkCommunicationErrorProcessFuture extends GridFutureAdapter implements IgniteSpiTimeoutObject, Runnable { +class ZkCommunicationErrorProcessFuture extends GridFutureAdapter<Void> implements IgniteSpiTimeoutObject, Runnable { /** */ private final ZookeeperDiscoveryImpl impl; /** */ - private final Map<UUID, GridFutureAdapter<Boolean>> errNodes = new HashMap<>(); + private final Map<Long, GridFutureAdapter<Boolean>> nodeFuts = new HashMap<>(); /** */ private final long endTime; @@ -42,48 +49,142 @@ class ZkCommunicationErrorProcessFuture extends GridFutureAdapter implements Ign /** */ private final IgniteUuid id; + /** */ + private State state; + + /** */ + private long resolveTopVer; + + /** */ + private Set<Long> resFailedNodes; + + /** */ + private ZkCollectDistributedFuture nodeResFut; + + /** + * @param impl Discovery impl. + * @param timeout Timeout to wait before initiating resolve process. + * @return Future. + */ + static ZkCommunicationErrorProcessFuture createOnCommunicationError(ZookeeperDiscoveryImpl impl, long timeout) { + return new ZkCommunicationErrorProcessFuture(impl, State.WAIT_TIMEOUT, timeout); + } + + /** + * @param impl Discovery impl. + * @return Future. + */ + static ZkCommunicationErrorProcessFuture createOnStartResolveRequest(ZookeeperDiscoveryImpl impl) { + return new ZkCommunicationErrorProcessFuture(impl, State.RESOLVE_STARTED, 0); + } + /** * @param impl Discovery implementation. + * @param state Initial state. * @param timeout Wait timeout before initiating communication errors resolve. */ - ZkCommunicationErrorProcessFuture(ZookeeperDiscoveryImpl impl, long timeout) { + private ZkCommunicationErrorProcessFuture(ZookeeperDiscoveryImpl impl, State state, long timeout) { + assert state != State.DONE; + this.impl = impl; - id = IgniteUuid.fromUuid(impl.localNode().id()); + if (state == State.WAIT_TIMEOUT) { + assert timeout > 0 : timeout; + + id = IgniteUuid.fromUuid(impl.localNode().id()); + endTime = System.currentTimeMillis() + timeout; + } + else { + id = null; + endTime = 0; + } - endTime = System.currentTimeMillis() + timeout; + this.state = state; + } + + void nodeResultCollectFuture(ZkCollectDistributedFuture nodeResFut) { + assert nodeResFut == null : nodeResFut; + + this.nodeResFut = nodeResFut; + } + + void pingNodesAndNotifyFuture(long locNodeOrder, ZkRuntimeState rtState, String futPath, Collection<ClusterNode> nodes) + throws Exception { + ZkCollectDistributedFuture.saveNodeResult(futPath, rtState.zkClient, locNodeOrder, null); } /** - * @param nodeId Node ID. - * @return Future finished when communication error resolve is done. + * */ - GridFutureAdapter<Boolean> nodeStatusFuture(UUID nodeId) { - GridFutureAdapter<Boolean> fut; + void scheduleCheckOnTimeout() { + synchronized (this) { + if (state == State.WAIT_TIMEOUT) + impl.spi.getSpiContext().addTimeoutObject(this); + } + } - // TODO ZK: finish race. + /** + * @param topVer Topology version. + * @return {@code False} if future was already completed and need create another future instance. + */ + boolean onStartResolveRequest(long topVer) { + synchronized (this) { + if (state == State.DONE) + return false; + + if (state == State.WAIT_TIMEOUT) + impl.spi.getSpiContext().removeTimeoutObject(this); + + assert resolveTopVer == 0 : resolveTopVer; + + resolveTopVer = topVer; + + state = State.RESOLVE_STARTED; + } + + return true; + } + + /** + * @param node Node. + * @return Future finished when communication error resolve is done or {@code null} if another + * resolve process should be started. + */ + @Nullable IgniteInternalFuture<Boolean> nodeStatusFuture(ClusterNode node) { + GridFutureAdapter<Boolean> fut; synchronized (this) { - fut = errNodes.get(nodeId); + if (state == State.DONE) { + if (resolveTopVer != 0 && node.order() <= resolveTopVer) { + Boolean res = !F.contains(resFailedNodes, node.order()); + + return new GridFinishedFuture<>(res); + } + else + return null; + } + + fut = nodeFuts.get(node.order()); if (fut == null) - errNodes.put(nodeId, fut = new GridFutureAdapter<>()); + nodeFuts.put(node.order(), fut = new GridFutureAdapter<>()); } - if (impl.node(nodeId) == null) + if (impl.node(node.order()) == null) fut.onDone(false); return fut; } /** - * @param nodeId Node ID. + * @param node Failed node. */ - void onNodeFailed(UUID nodeId) { - GridFutureAdapter<Boolean> fut; + void onNodeFailed(ClusterNode node) { + GridFutureAdapter<Boolean> fut = null; synchronized (this) { - fut = errNodes.get(nodeId); + if (state == State.WAIT_TIMEOUT) + fut = nodeFuts.get(node.order()); } if (fut != null) @@ -92,14 +193,50 @@ class ZkCommunicationErrorProcessFuture extends GridFutureAdapter implements Ign /** {@inheritDoc} */ @Override public void run() { - if (checkNotDoneOnTimeout()) { + // Run from zk discovery worker pool after timeout. + if (processTimeout()) { try { - impl.sendCustomMessage(new ZkInternalCommunicationErrorMessage()); + impl.sendCustomMessage(new ZkCommunicationErrorResolveStartMessage(UUID.randomUUID())); } catch (Exception e) { - onError(e); + Collection<GridFutureAdapter<Boolean>> futs; + + synchronized (this) { + if (state != State.WAIT_TIMEOUT) + return; + + state = State.DONE; + + futs = nodeFuts.values(); // nodeFuts should not be modified after state changed to DONE. + } + + for (GridFutureAdapter<Boolean> fut : futs) + fut.onDone(e); + + onDone(e); + } + } + } + + /** + * @return {@code True} if need initiate resolve process after timeout expired. + */ + private boolean processTimeout() { + synchronized (this) { + if (state != State.WAIT_TIMEOUT) + return false; + + for (GridFutureAdapter<Boolean> fut : nodeFuts.values()) { + if (!fut.isDone()) + return true; } + + state = State.DONE; } + + onDone(null, null); + + return false; } /** {@inheritDoc} */ @@ -114,43 +251,37 @@ class ZkCommunicationErrorProcessFuture extends GridFutureAdapter implements Ign /** {@inheritDoc} */ @Override public void onTimeout() { - if (isDone()) - return; - - if (checkNotDoneOnTimeout()) + if (processTimeout()) impl.runInWorkerThread(this); } - /** - * @param e Error. - */ - private void onError(Exception e) { - List<GridFutureAdapter<Boolean>> futs; + /** {@inheritDoc} */ + @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) { + if (super.onDone(res, err)) { + impl.clearCommunicationErrorProcessFuture(this); - synchronized (this) { - futs = new ArrayList<>(errNodes.values()); + return true; } - for (GridFutureAdapter<Boolean> fut : futs) - fut.onDone(e); + return false; + } - onDone(e); + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ZkCommunicationErrorProcessFuture.class, this); } /** - * @return {@code True} if future already finished. + * */ - private boolean checkNotDoneOnTimeout() { - // TODO ZK check state. - synchronized (this) { - for (GridFutureAdapter<Boolean> fut : errNodes.values()) { - if (!fut.isDone()) - return false; - } - } + enum State { + /** */ + DONE, - onDone(null); + /** */ + WAIT_TIMEOUT, - return true; + /** */ + RESOLVE_STARTED } } http://git-wip-us.apache.org/repos/asf/ignite/blob/376a4845/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java new file mode 100644 index 0000000..144a5bf --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.util.UUID; + +/** + * + */ +class ZkCommunicationErrorResolveFinishMessage implements ZkInternalMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + final UUID futId; + + /** + * @param futId Future ID. + */ + ZkCommunicationErrorResolveFinishMessage(UUID futId) { + this.futId = futId; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/376a4845/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java new file mode 100644 index 0000000..e619d7b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.util.UUID; +import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class ZkCommunicationErrorResolveStartMessage implements DiscoverySpiCustomMessage, ZkInternalMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + final UUID id; + + /** + * @param id Unique ID. + */ + ZkCommunicationErrorResolveStartMessage(UUID id) { + this.id = id; + } + + /** {@inheritDoc} */ + @Nullable @Override public DiscoverySpiCustomMessage ackMessage() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean isMutable() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/376a4845/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java index 37dc7df..faea49e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java @@ -20,6 +20,7 @@ package org.apache.ignite.spi.discovery.zk.internal; import java.io.Serializable; import java.util.Collection; import java.util.TreeMap; +import java.util.UUID; import org.jetbrains.annotations.Nullable; /** @@ -44,6 +45,9 @@ class ZkDiscoveryEventsData implements Serializable { /** */ TreeMap<Long, ZkDiscoveryEventData> evts; + /** */ + private UUID commErrFutId; + /** * @param topVer Current topology version. * @param gridStartTime Cluster start time. @@ -56,6 +60,20 @@ class ZkDiscoveryEventsData implements Serializable { } /** + * @return Future ID. + */ + @Nullable UUID communicationErrorResolveFutureId() { + return commErrFutId; + } + + /** + * @param id Future ID. + */ + void communicationErrorResolveFutureId(UUID id) { + commErrFutId = id; + } + + /** * @param nodes Current nodes in topology (these nodes should ack that event processed). * @param evt Event. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/376a4845/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java deleted file mode 100644 index ffe65c3..0000000 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spi.discovery.zk.internal; - -import java.util.List; -import java.util.Set; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.data.Stat; -import org.jetbrains.annotations.Nullable; - -/** - * - */ -public class ZkEventAckFuture extends GridFutureAdapter<Void> implements Watcher, AsyncCallback.Children2Callback { - /** */ - private final IgniteLogger log; - - /** */ - private final ZookeeperDiscoveryImpl impl; - - /** */ - private final Long evtId; - - /** */ - private final String evtPath; - - /** */ - private final int expAcks; - - /** */ - private final Set<Integer> remaininAcks; - - /** - * @param impl - * @param evtPath - * @param evtId - */ - ZkEventAckFuture(ZookeeperDiscoveryImpl impl, String evtPath, Long evtId) { - this.impl = impl; - this.log = impl.log(); - this.evtPath = evtPath; - this.evtId = evtId; - - ZkClusterNodes top = impl.nodes(); - - remaininAcks = U.newHashSet(top.nodesById.size()); - - for (ZookeeperClusterNode node : top.nodesByInternalId.values()) { - if (!node.isLocal()) - remaininAcks.add(node.internalId()); - } - - expAcks = remaininAcks.size(); - - if (expAcks == 0) - onDone(); - else - impl.zkClient().getChildrenAsync(evtPath, this, this); - } - - /** - * @return Event ID. - */ - Long eventId() { - return evtId; - } - - /** - * @param node Failed node. - */ - void onNodeFail(ZookeeperClusterNode node) { - assert !remaininAcks.isEmpty(); - - if (remaininAcks.remove(node.internalId()) && remaininAcks.isEmpty()) - onDone(); - } - - /** {@inheritDoc} */ - @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) { - if (super.onDone(res, err)) { - return true; - } - - return false; - } - - /** {@inheritDoc} */ - @Override public void process(WatchedEvent evt) { - if (isDone()) - return; - - if (evt.getType() == Event.EventType.NodeChildrenChanged) { - if (evtPath.equals(evt.getPath())) - impl.zkClient().getChildrenAsync(evtPath, this, this); - else - U.warn(log, "Received event for unknown path: " + evt.getPath()); - } - } - - /** {@inheritDoc} */ - @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { - assert rc == 0 : KeeperException.Code.get(rc); - - if (isDone()) - return; - - if (expAcks == stat.getCversion()) { - log.info("Received expected number of acks [expCnt=" + expAcks + ", cVer=" + stat.getCversion() + ']'); - - onDone(); - } - else { - for (int i = 0; i < children.size(); i++) { - Integer nodeInternalId = Integer.parseInt(children.get(i)); - - if (remaininAcks.remove(nodeInternalId) && remaininAcks.size() == 0) - onDone(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/376a4845/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java new file mode 100644 index 0000000..333f457 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.util.UUID; +import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class ZkForceNodeFailMessage implements DiscoverySpiCustomMessage, ZkInternalMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + final UUID nodeId; + + /** */ + final String warning; + + /** + * @param nodeId Node ID. + * @param warning Warning to be displayed on all nodes. + */ + ZkForceNodeFailMessage(UUID nodeId, String warning) { + this.nodeId = nodeId; + this.warning = warning; + } + + /** {@inheritDoc} */ + @Nullable @Override public DiscoverySpiCustomMessage ackMessage() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean isMutable() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/376a4845/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java index f08032a..06c5d9e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java @@ -292,4 +292,12 @@ class ZkIgnitePaths { String ackEventDataPath(long evtId) { return customEvtsAcksDir + "/" + String.valueOf(evtId); } + + /** + * @param id Future ID. + * @return Future path. + */ + String distributedFutureBasePath(UUID id) { + return evtsPath + "/f-" + id; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/376a4845/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalCommunicationErrorMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalCommunicationErrorMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalCommunicationErrorMessage.java deleted file mode 100644 index d7ed7ab..0000000 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalCommunicationErrorMessage.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spi.discovery.zk.internal; - -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; -import org.jetbrains.annotations.Nullable; - -/** - * - */ -public class ZkInternalCommunicationErrorMessage implements DiscoverySpiCustomMessage, ZkInternalMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Nullable @Override public DiscoverySpiCustomMessage ackMessage() { - return null; - } - - /** {@inheritDoc} */ - @Override public boolean isMutable() { - return false; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/376a4845/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalForceNodeFailMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalForceNodeFailMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalForceNodeFailMessage.java deleted file mode 100644 index 8d7a3df..0000000 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalForceNodeFailMessage.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spi.discovery.zk.internal; - -import java.util.UUID; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; -import org.jetbrains.annotations.Nullable; - -/** - * - */ -public class ZkInternalForceNodeFailMessage implements DiscoverySpiCustomMessage, ZkInternalMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - final UUID nodeId; - - /** */ - final String warning; - - /** - * @param nodeId Node ID. - * @param warning Warning to be displayed on all nodes. - */ - ZkInternalForceNodeFailMessage(UUID nodeId, String warning) { - this.nodeId = nodeId; - this.warning = warning; - } - - /** {@inheritDoc} */ - @Nullable @Override public DiscoverySpiCustomMessage ackMessage() { - return null; - } - - /** {@inheritDoc} */ - @Override public boolean isMutable() { - return false; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/376a4845/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRunnable.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRunnable.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRunnable.java new file mode 100644 index 0000000..fb6cf89 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRunnable.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +/** + * + */ +public abstract class ZkRunnable extends ZkAbstractCallabck implements Runnable { + /** + * @param rtState Runtime state. + * @param impl Discovery impl. + */ + ZkRunnable(ZkRuntimeState rtState, ZookeeperDiscoveryImpl impl) { + super(rtState, impl); + } + + /** {@inheritDoc} */ + @Override public void run() { + if (!onProcessStart()) + return; + + try { + run0(); + + onProcessEnd(); + } + catch (Throwable e) { + onProcessError(e); + } + } + + /** + * + */ + protected abstract void run0() throws Exception; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/376a4845/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index d21c18b..a153d11 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -43,6 +44,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.EventType; import org.apache.ignite.internal.ClusterMetricsSnapshot; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.IgnitionEx; import org.apache.ignite.internal.events.DiscoveryCustomEvent; @@ -52,6 +54,7 @@ import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.MarshallerUtils; @@ -88,7 +91,7 @@ public class ZookeeperDiscoveryImpl { static final String IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD = "IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD"; /** */ - private final ZookeeperDiscoverySpi spi; + final ZookeeperDiscoverySpi spi; /** */ private final String igniteInstanceName; @@ -109,7 +112,7 @@ public class ZookeeperDiscoveryImpl { private final IgniteLogger log; /** */ - private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); + final GridSpinBusyLock busyLock = new GridSpinBusyLock(); /** */ private final ZookeeperClusterNode locNode; @@ -217,9 +220,25 @@ public class ZookeeperDiscoveryImpl { return rtState.top.nodesById.get(nodeId); } + /** + * @param nodeOrder Node order. + * @return Node instance. + */ + @Nullable public ZookeeperClusterNode node(long nodeOrder) { + assert nodeOrder > 0 : nodeOrder; + + return rtState.top.nodesByOrder.get(nodeOrder); + } + /** */ private final AtomicReference<ZkCommunicationErrorProcessFuture> commErrProcFut = new AtomicReference<>(); + void clearCommunicationErrorProcessFuture(ZkCommunicationErrorProcessFuture fut) { + assert fut.isDone() : fut; + + commErrProcFut.compareAndSet(fut, null); + } + /** * @param node0 Problem node ID * @param err Connect error. @@ -230,24 +249,41 @@ public class ZookeeperDiscoveryImpl { if (node == null) return; - ZkCommunicationErrorProcessFuture fut = commErrProcFut.get(); + IgniteInternalFuture<Boolean> nodeStatusFut; + + for (;;) { + ZkCommunicationErrorProcessFuture fut = commErrProcFut.get(); + + if (fut == null || fut.isDone()) { + ZkCommunicationErrorProcessFuture newFut = ZkCommunicationErrorProcessFuture.createOnCommunicationError( + this, + node.sessionTimeout() + 1000); + + if (commErrProcFut.compareAndSet(fut, newFut)) { + fut = newFut; - if (fut == null || fut.isDone()) { - ZkCommunicationErrorProcessFuture newFut = new ZkCommunicationErrorProcessFuture( - this, - node.sessionTimeout() + 1000); + fut.scheduleCheckOnTimeout(); + } + else + fut = commErrProcFut.get(); + } - if (commErrProcFut.compareAndSet(fut, newFut)) { - fut = newFut; + nodeStatusFut = fut.nodeStatusFuture(node); - spi.getSpiContext().addTimeoutObject(fut); + if (nodeStatusFut != null) + break; + else { + try { + fut.get(); + } + catch (IgniteCheckedException e) { + U.warn(log, "Previous communication error process future failed: " + e); + } } - else - fut = commErrProcFut.get(); } try { - fut.nodeStatusFuture(node.id()).get(); + nodeStatusFut.get(); } catch (IgniteCheckedException e) { throw new IgniteSpiException(e); @@ -312,7 +348,7 @@ public class ZookeeperDiscoveryImpl { return; } - sendCustomMessage(new ZkInternalForceNodeFailMessage(nodeId, warning)); + sendCustomMessage(new ZkForceNodeFailMessage(nodeId, warning)); } /** @@ -1503,22 +1539,23 @@ public class ZookeeperDiscoveryImpl { } /** + * @param zkClient Client. * @param evtPath Event path. * @param sndNodeId Sender node ID. * @return Event data. * @throws Exception If failed. */ - private byte[] readCustomEventData(String evtPath, UUID sndNodeId) throws Exception { + private byte[] readCustomEventData(ZookeeperClient zkClient, String evtPath, UUID sndNodeId) throws Exception { int partCnt = ZkIgnitePaths.customEventPartsCount(evtPath); if (partCnt > 1) { String partsBasePath = zkPaths.customEventPartsBasePath( ZkIgnitePaths.customEventPrefix(evtPath), sndNodeId); - return readMultipleParts(rtState.zkClient, partsBasePath, partCnt); + return readMultipleParts(zkClient, partsBasePath, partCnt); } else - return rtState.zkClient.getData(zkPaths.customEvtsDir + "/" + evtPath); + return zkClient.getData(zkPaths.customEvtsDir + "/" + evtPath); } /** @@ -1528,6 +1565,9 @@ public class ZookeeperDiscoveryImpl { private void generateCustomEvents(List<String> customEvtNodes) throws Exception { assert rtState.crd; + ZookeeperClient zkClient = rtState.zkClient; + ZkDiscoveryEventsData evtsData = rtState.evtsData; + TreeMap<Integer, String> newEvts = null; for (int i = 0; i < customEvtNodes.size(); i++) { @@ -1535,7 +1575,7 @@ public class ZookeeperDiscoveryImpl { int evtSeq = ZkIgnitePaths.customEventSequence(evtPath); - if (evtSeq > rtState.evtsData.procCustEvt) { + if (evtSeq > evtsData.procCustEvt) { if (newEvts == null) newEvts = new TreeMap<>(); @@ -1547,6 +1587,8 @@ public class ZookeeperDiscoveryImpl { Set<UUID> alives = null; for (Map.Entry<Integer, String> evtE : newEvts.entrySet()) { + evtsData.procCustEvt = evtE.getKey(); + String evtPath = evtE.getValue(); UUID sndNodeId = ZkIgnitePaths.customEventSendNodeId(evtPath); @@ -1557,23 +1599,21 @@ public class ZookeeperDiscoveryImpl { sndNode = null; if (sndNode != null) { - byte[] evtBytes = readCustomEventData(evtPath, sndNodeId); + byte[] evtBytes = readCustomEventData(zkClient, evtPath, sndNodeId); DiscoverySpiCustomMessage msg; try { msg = unmarshalZip(evtBytes); - rtState.evtsData.evtIdGen++; - - if (msg instanceof ZkInternalForceNodeFailMessage) { - ZkInternalForceNodeFailMessage msg0 = (ZkInternalForceNodeFailMessage)msg; + if (msg instanceof ZkForceNodeFailMessage) { + ZkForceNodeFailMessage msg0 = (ZkForceNodeFailMessage)msg; if (alives == null) alives = new HashSet<>(rtState.top.nodesById.keySet()); if (alives.contains(msg0.nodeId)) { - rtState.evtsData.topVer++; + evtsData.topVer++; alives.remove(msg0.nodeId); @@ -1581,9 +1621,9 @@ public class ZookeeperDiscoveryImpl { assert node != null : msg0.nodeId; - for (String child : zkClient().getChildren(zkPaths.aliveNodesDir)) { + for (String child : zkClient.getChildren(zkPaths.aliveNodesDir)) { if (ZkIgnitePaths.aliveInternalId(child) == node.internalId()) { - zkClient().deleteIfExistsAsync(zkPaths.aliveNodesDir + "/" + child); + zkClient.deleteIfExistsAsync(zkPaths.aliveNodesDir + "/" + child); break; } @@ -1593,20 +1633,51 @@ public class ZookeeperDiscoveryImpl { if (log.isDebugEnabled()) log.debug("Ignore forcible node fail request for unknown node: " + msg0.nodeId); + deleteCustomEventDataAsync(zkClient, evtPath); + + continue; + } + } + else if (msg instanceof ZkCommunicationErrorResolveStartMessage) { + ZkCommunicationErrorResolveStartMessage msg0 = + (ZkCommunicationErrorResolveStartMessage)msg; + + if (evtsData.communicationErrorResolveFutureId() != null) { + if (log.isInfoEnabled()) { + log.info("Ignore communication error resolve message, resolve process " + + "already started [sndNode=" + sndNode + ']'); + } + + deleteCustomEventDataAsync(zkClient, evtPath); + continue; } + else { + if (log.isInfoEnabled()) { + log.info("Start communication error resolve [sndNode=" + sndNode + + ", topVer=" + evtsData.topVer + ']'); + } + + zkClient.createIfNeeded(zkPaths.distributedFutureBasePath(msg0.id), + null, + PERSISTENT); + + evtsData.communicationErrorResolveFutureId(msg0.id); + } } + evtsData.evtIdGen++; + ZkDiscoveryCustomEventData evtData = new ZkDiscoveryCustomEventData( - rtState.evtsData.evtIdGen, - rtState.evtsData.topVer, + evtsData.evtIdGen, + evtsData.topVer, sndNodeId, evtPath, false); evtData.msg = msg; - rtState.evtsData.addEvent(rtState.top.nodesByOrder.values(), evtData); + evtsData.addEvent(rtState.top.nodesByOrder.values(), evtData); if (log.isDebugEnabled()) log.debug("Generated CUSTOM event [evt=" + evtData + ", msg=" + msg + ']'); @@ -1622,8 +1693,6 @@ public class ZookeeperDiscoveryImpl { deleteCustomEventDataAsync(rtState.zkClient, evtPath); } - - rtState.evtsData.procCustEvt = evtE.getKey(); } saveAndProcessNewEvents(); @@ -1695,6 +1764,8 @@ public class ZookeeperDiscoveryImpl { private void processNewEvents(final ZkDiscoveryEventsData evtsData) throws Exception { TreeMap<Long, ZkDiscoveryEventData> evts = evtsData.evts; + ZookeeperClient zkClient = rtState.zkClient; + boolean updateNodeInfo = false; for (ZkDiscoveryEventData evtData : evts.tailMap(rtState.locNodeInfo.lastProcEvt, false).values()) { @@ -1768,10 +1839,12 @@ public class ZookeeperDiscoveryImpl { if (evtData0.ackEvent()) { String path = zkPaths.ackEventDataPath(evtData0.eventId()); - msg = unmarshalZip(rtState.zkClient.getData(path)); + msg = unmarshalZip(zkClient.getData(path)); } else { - byte[] msgBytes = readCustomEventData(evtData0.evtPath, evtData0.sndNodeId); + byte[] msgBytes = readCustomEventData(zkClient, + evtData0.evtPath, + evtData0.sndNodeId); msg = unmarshalZip(msgBytes); } @@ -1814,7 +1887,7 @@ public class ZookeeperDiscoveryImpl { if (log.isDebugEnabled()) log.debug("Update processed events: " + rtState.locNodeInfo.lastProcEvt); - rtState.zkClient.setData(rtState.locNodeZkPath, marshalZip(rtState.locNodeInfo), -1); + zkClient.setData(rtState.locNodeZkPath, marshalZip(rtState.locNodeInfo), -1); } } @@ -1897,29 +1970,134 @@ public class ZookeeperDiscoveryImpl { * @throws Exception If failed. */ private void processInternalMessage(ZkDiscoveryCustomEventData evtData, ZkInternalMessage msg) throws Exception { - if (msg instanceof ZkInternalForceNodeFailMessage) { - ZkInternalForceNodeFailMessage msg0 = (ZkInternalForceNodeFailMessage)msg; + if (msg instanceof ZkForceNodeFailMessage) + processForceNodeFailMessage((ZkForceNodeFailMessage)msg, evtData); + else if (msg instanceof ZkCommunicationErrorResolveStartMessage) { + processStartResolveCommunicationErrorMessage( + (ZkCommunicationErrorResolveStartMessage)msg, + evtData); + } + else if (msg instanceof ZkCommunicationErrorResolveFinishMessage) { + ZkCommunicationErrorResolveFinishMessage msg0 = (ZkCommunicationErrorResolveFinishMessage)msg; + } + } + + /** + * @param msg Message. + * @param evtData Event data. + * @throws Exception If failed. + */ + private void processForceNodeFailMessage(ZkForceNodeFailMessage msg, ZkDiscoveryCustomEventData evtData) + throws Exception { + ClusterNode creatorNode = rtState.top.nodesById.get(evtData.sndNodeId); + + if (msg.warning != null) { + U.warn(log, "Received EVT_NODE_FAILED event with warning [" + + "nodeInitiatedEvt=" + (creatorNode != null ? creatorNode : evtData.sndNodeId) + + ", nodeId=" + msg.nodeId + + ", msg=" + msg.warning + ']'); + } + else { + U.warn(log, "Received force EVT_NODE_FAILED event [" + + "nodeInitiatedEvt=" + (creatorNode != null ? creatorNode : evtData.sndNodeId) + + ", nodeId=" + msg.nodeId + ']'); + } + + ZookeeperClusterNode node = rtState.top.nodesById.get(msg.nodeId); + + assert node != null : msg.nodeId; + + processNodeFail(node.internalId(), evtData.topologyVersion()); + } + + /** + * @param msg Message. + * @param evtData Event data. + */ + private void processStartResolveCommunicationErrorMessage(ZkCommunicationErrorResolveStartMessage msg, + ZkDiscoveryCustomEventData evtData) throws Exception { + ZkCommunicationErrorProcessFuture fut; - ClusterNode creatorNode = rtState.top.nodesById.get(evtData.sndNodeId); + for (;;) { + fut = commErrProcFut.get(); + + if (fut == null || fut.isDone()) { + ZkCommunicationErrorProcessFuture newFut = + ZkCommunicationErrorProcessFuture.createOnStartResolveRequest(this); - if (msg0.warning != null) { - U.warn(log, "Received EVT_NODE_FAILED event with warning [" + - "nodeInitiatedEvt=" + (creatorNode != null ? creatorNode : evtData.sndNodeId) + - ", nodeId=" + msg0.nodeId + - ", msg=" + msg0.warning + ']'); + if (commErrProcFut.compareAndSet(fut, newFut)) + fut = newFut; + else + fut = commErrProcFut.get(); } + + if (fut.onStartResolveRequest(evtData.topologyVersion())) + break; else { - U.warn(log, "Received force EVT_NODE_FAILED event [" + - "nodeInitiatedEvt=" + (creatorNode != null ? creatorNode : evtData.sndNodeId) + - ", nodeId=" + msg0.nodeId + ']'); + try { + fut.get(); + } + catch (Exception e) { + U.warn(log, "Previous communication error process future failed: " + e); + } } + } + + assert !fut.isDone() : fut; - ZookeeperClusterNode node = rtState.top.nodesById.get(msg0.nodeId); + final String futPath = zkPaths.distributedFutureBasePath(msg.id); + final ZkCommunicationErrorProcessFuture fut0 = fut; + final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot(); + + if (rtState.crd) { + ZkCollectDistributedFuture nodeResFut = new ZkCollectDistributedFuture(this, rtState, futPath, + new Callable<Void>() { + @Override public Void call() throws Exception { + // Future is completed from ZK event thread. + finishCommunicationResolveProcess(rtState); - assert node != null : msg0.nodeId; + return null; + } + } + ); - processNodeFail(node.internalId(), evtData.topologyVersion()); + fut.nodeResultCollectFuture(nodeResFut); } + + runInWorkerThread(new ZkRunnable(rtState, this) { + @Override protected void run0() throws Exception { + fut0.pingNodesAndNotifyFuture(locNode.order(), rtState, futPath, topSnapshot); + } + }); + } + + /** + * @param rtState Runtime state. + * @throws Exception If failed. + */ + void finishCommunicationResolveProcess(ZkRuntimeState rtState) throws Exception { + ZkDiscoveryEventsData evtsData = rtState.evtsData; + + UUID futId = rtState.evtsData.communicationErrorResolveFutureId(); + + assert futId != null; + + rtState.evtsData.communicationErrorResolveFutureId(null); + + ZkCommunicationErrorResolveFinishMessage msg = new ZkCommunicationErrorResolveFinishMessage(futId); + + ZkDiscoveryCustomEventData evtData = new ZkDiscoveryCustomEventData( + evtsData.evtIdGen, + evtsData.topVer, + locNode.id(), + null, + false); + + evtData.msg = msg; + + evtsData.addEvent(rtState.top.nodesByOrder.values(), evtData); + + saveAndProcessNewEvents(); } /** @@ -2041,7 +2219,7 @@ public class ZookeeperDiscoveryImpl { ZkCommunicationErrorProcessFuture commErrFut = commErrProcFut.get(); if (commErrFut != null) - commErrFut.onNodeFailed(failedNode.id()); + commErrFut.onNodeFailed(failedNode); final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot(); @@ -2301,8 +2479,6 @@ public class ZookeeperDiscoveryImpl { connState = ConnectionState.STOPPED; } - ZookeeperClient zkClient = rtState.zkClient; - rtState.onCloseStart(); busyLock.block(); @@ -2311,6 +2487,8 @@ public class ZookeeperDiscoveryImpl { joinFut.onDone(e); + ZookeeperClient zkClient = rtState.zkClient; + if (zkClient != null) zkClient.close(); @@ -2321,7 +2499,7 @@ public class ZookeeperDiscoveryImpl { * @param busyLock Busy lock. * @param err Error. */ - private void onFatalError(GridSpinBusyLock busyLock, Throwable err) { + void onFatalError(GridSpinBusyLock busyLock, Throwable err) { busyLock.leaveBusy(); if (err instanceof ZookeeperClientFailedException) @@ -2444,111 +2622,12 @@ public class ZookeeperDiscoveryImpl { /** * */ - abstract class ZkCallabck { - /** */ - final ZkRuntimeState rtState; - - /** - * @param rtState Runtime state. - */ - ZkCallabck(ZkRuntimeState rtState) { - this.rtState = rtState; - } - - /** - * @return {@code True} if is able to start processing. - */ - final boolean onProcessStart() { - return !rtState.closing && busyLock.enterBusy(); - } - - /** - * - */ - final void onProcessEnd() { - busyLock.leaveBusy(); - } - - /** - * @param e Error. - */ - final void onProcessError(Throwable e) { - onFatalError(busyLock, e); - } - } - - /** - * - */ - abstract class AbstractWatcher extends ZkCallabck implements Watcher { - /** - * @param rtState Runtime state. - */ - AbstractWatcher(ZkRuntimeState rtState) { - super(rtState); - } - - /** {@inheritDoc} */ - @Override public final void process(WatchedEvent evt) { - if (!onProcessStart()) - return; - - try { - process0(evt); - - onProcessEnd(); - } - catch (Throwable e) { - onProcessError(e); - } - } - - /** - * @param evt Event. - * @throws Exception If failed. - */ - protected abstract void process0(WatchedEvent evt) throws Exception; - } - - /** - * - */ - abstract class AbstractChildrenCallback extends ZkCallabck implements AsyncCallback.Children2Callback { - /** - * @param rtState Runtime state. - */ - AbstractChildrenCallback(ZkRuntimeState rtState) { - super(rtState); - } - - /** {@inheritDoc} */ - @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { - if (!onProcessStart()) - return; - - try { - processResult0(rc, path, ctx, children, stat); - - onProcessEnd(); - } - catch (Throwable e) { - onProcessError(e); - } - } - - abstract void processResult0(int rc, String path, Object ctx, List<String> children, Stat stat) - throws Exception; - } - - /** - * - */ - private class ZkWatcher extends AbstractWatcher implements ZkRuntimeState.ZkWatcher { + private class ZkWatcher extends ZkAbstractWatcher implements ZkRuntimeState.ZkWatcher { /** * @param rtState Runtime state. */ ZkWatcher(ZkRuntimeState rtState) { - super(rtState); + super(rtState, ZookeeperDiscoveryImpl.this); } /** {@inheritDoc} */ @@ -2619,12 +2698,12 @@ public class ZookeeperDiscoveryImpl { /** * */ - private class AliveNodeDataWatcher extends AbstractWatcher implements ZkRuntimeState.ZkAliveNodeDataWatcher { + private class AliveNodeDataWatcher extends ZkAbstractWatcher implements ZkRuntimeState.ZkAliveNodeDataWatcher { /** * @param rtState Runtime state. */ AliveNodeDataWatcher(ZkRuntimeState rtState) { - super(rtState); + super(rtState, ZookeeperDiscoveryImpl.this); } /** {@inheritDoc} */ @@ -2691,12 +2770,12 @@ public class ZookeeperDiscoveryImpl { /** * */ - private class PreviousNodeWatcher extends AbstractWatcher implements AsyncCallback.StatCallback { + private class PreviousNodeWatcher extends ZkAbstractWatcher implements AsyncCallback.StatCallback { /** * @param rtState Runtime state. */ PreviousNodeWatcher(ZkRuntimeState rtState) { - super(rtState); + super(rtState, ZookeeperDiscoveryImpl.this); } /** {@inheritDoc} */ @@ -2731,12 +2810,12 @@ public class ZookeeperDiscoveryImpl { /** * */ - class CheckCoordinatorCallback extends AbstractChildrenCallback { + class CheckCoordinatorCallback extends ZkAbstractChildrenCallback { /** * @param rtState Runtime state. */ CheckCoordinatorCallback(ZkRuntimeState rtState) { - super(rtState); + super(rtState, ZookeeperDiscoveryImpl.this); } /** {@inheritDoc} */