zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b78183fe Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b78183fe Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b78183fe Branch: refs/heads/ignite-zk Commit: b78183feede25b3cf78a40731b82b7e75d0e0ad8 Parents: 39edd4b Author: sboikov <sboi...@gridgain.com> Authored: Mon Dec 11 15:45:02 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Dec 12 11:24:16 2017 +0300 ---------------------------------------------------------------------- .../managers/discovery/IgniteDiscoverySpi.java | 2 +- .../ZkCommunicationErrorProcessFuture.java | 94 ++++++++++++++++++++ .../ZkInternalCommunicationErrorMessage.java | 39 ++++++++ .../zk/internal/ZookeeperDiscoveryImpl.java | 41 +++++++++ .../IgniteOptimisticTxSuspendResumeTest.java | 2 + 5 files changed, 177 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b78183fe/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java index 2752210..1e4524e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java @@ -32,7 +32,7 @@ public interface IgniteDiscoverySpi extends DiscoverySpi { /** * - * @return + * @return {@code True} if SPI supports client reconnect. */ public boolean reconnectSupported(); http://git-wip-us.apache.org/repos/asf/ignite/blob/b78183fe/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java new file mode 100644 index 0000000..91ecaf7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.util.UUID; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.spi.IgniteSpiTimeoutObject; +import org.jboss.netty.util.internal.ConcurrentHashMap; + +/** + * + */ +class ZkCommunicationErrorProcessFuture extends GridFutureAdapter implements IgniteSpiTimeoutObject, Runnable { + /** */ + private final ZookeeperDiscoveryImpl impl; + + /** */ + private final ConcurrentHashMap<UUID, GridFutureAdapter<Boolean>> errNodes = new ConcurrentHashMap<>(); + + /** */ + private final long endTime; + + /** */ + private final IgniteUuid id; + + /** + * @param impl Discovery implementation. + */ + ZkCommunicationErrorProcessFuture(ZookeeperDiscoveryImpl impl, long timeout) { + this.impl = impl; + + id = IgniteUuid.fromUuid(impl.localNode().id()); + + endTime = System.currentTimeMillis() + timeout; + } + + GridFutureAdapter<Boolean> nodeStatusFuture(UUID nodeId) { + GridFutureAdapter<Boolean> fut = errNodes.get(nodeId); + + if (fut == null) { + GridFutureAdapter<Boolean> old = errNodes.putIfAbsent(nodeId, fut = new GridFutureAdapter<>()); + + if (old != null) + fut = old; + } + + if (impl.node(nodeId) == null) + fut.onDone(false); + + return fut; + } + + void onNodeFailed(UUID nodeId) { + GridFutureAdapter<Boolean> fut = errNodes.get(nodeId); + + if (fut != null) + fut.onDone(false); + } + + /** {@inheritDoc} */ + @Override public void run() { + // TODO ZK + } + + /** {@inheritDoc} */ + @Override public IgniteUuid id() { + return null; + } + + @Override public long endTime() { + return 0; + } + + /** {@inheritDoc} */ + @Override public void onTimeout() { + // TODO ZK + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b78183fe/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalCommunicationErrorMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalCommunicationErrorMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalCommunicationErrorMessage.java new file mode 100644 index 0000000..d7ed7ab --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalCommunicationErrorMessage.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class ZkInternalCommunicationErrorMessage implements DiscoverySpiCustomMessage, ZkInternalMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Nullable @Override public DiscoverySpiCustomMessage ackMessage() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean isMutable() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b78183fe/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 5190329..9c1e398 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -31,6 +31,7 @@ import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteClientDisconnectedException; @@ -232,6 +233,41 @@ public class ZookeeperDiscoveryImpl { return rtState.top.nodesById.get(nodeId); } + /** */ + private final AtomicReference<ZkCommunicationErrorProcessFuture> commErrProcFut = new AtomicReference<>(); + + /** + * @param nodeId Problem node ID + * @param err Connect error. + */ + public void onCommunicationError(UUID nodeId, Exception err) { + ZookeeperClusterNode node = node(nodeId); + + if (node == null) + return; + + ZkCommunicationErrorProcessFuture fut = commErrProcFut.get(); + + if (fut == null || fut.isDone()) { + ZkCommunicationErrorProcessFuture newFut = new ZkCommunicationErrorProcessFuture(this, node.sessionTimeout()); + + if (commErrProcFut.compareAndSet(fut, newFut)) { + fut = newFut; + + sendCustomMessage(new ZkInternalCommunicationErrorMessage()); + } + else + fut = commErrProcFut.get(); + } + + try { + fut.nodeStatusFuture(nodeId).get(); + } + catch (IgniteCheckedException e) { + throw new IgniteSpiException(e); + } + } + /** * @param nodeId Node ID. * @return Ping result. @@ -2014,6 +2050,11 @@ public class ZookeeperDiscoveryImpl { if (pingFut != null) pingFut.onDone(false); + ZkCommunicationErrorProcessFuture commErrFut = commErrProcFut.get(); + + if (commErrFut != null) + commErrFut.onNodeFailed(failedNode.id()); + final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot(); lsnr.onDiscovery(EVT_NODE_FAILED, http://git-wip-us.apache.org/repos/asf/ignite/blob/b78183fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java index 86c0fa4..2f77dae 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java @@ -661,6 +661,8 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest ", backups=" + ccfg.getBackups() + ", near=" + (ccfg.getNearConfiguration() != null) + "]"); + awaitPartitionMapExchange(); + int srvNum = serversNumber(); if (serversNumber() > 1) { ignite(serversNumber() + 1).createNearCache(ccfg.getName(), new NearCacheConfiguration<>());