zk

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b78183fe
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b78183fe
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b78183fe

Branch: refs/heads/ignite-zk
Commit: b78183feede25b3cf78a40731b82b7e75d0e0ad8
Parents: 39edd4b
Author: sboikov <sboi...@gridgain.com>
Authored: Mon Dec 11 15:45:02 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Tue Dec 12 11:24:16 2017 +0300

----------------------------------------------------------------------
 .../managers/discovery/IgniteDiscoverySpi.java  |  2 +-
 .../ZkCommunicationErrorProcessFuture.java      | 94 ++++++++++++++++++++
 .../ZkInternalCommunicationErrorMessage.java    | 39 ++++++++
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 41 +++++++++
 .../IgniteOptimisticTxSuspendResumeTest.java    |  2 +
 5 files changed, 177 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b78183fe/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
index 2752210..1e4524e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
@@ -32,7 +32,7 @@ public interface IgniteDiscoverySpi extends DiscoverySpi {
 
     /**
      *
-     * @return
+     * @return {@code True} if SPI supports client reconnect.
      */
     public boolean reconnectSupported();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b78183fe/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
new file mode 100644
index 0000000..91ecaf7
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.UUID;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.IgniteSpiTimeoutObject;
+import org.jboss.netty.util.internal.ConcurrentHashMap;
+
+/**
+ *
+ */
+class ZkCommunicationErrorProcessFuture extends GridFutureAdapter implements 
IgniteSpiTimeoutObject, Runnable {
+    /** */
+    private final ZookeeperDiscoveryImpl impl;
+
+    /** */
+    private final ConcurrentHashMap<UUID, GridFutureAdapter<Boolean>> errNodes 
= new ConcurrentHashMap<>();
+
+    /** */
+    private final long endTime;
+
+    /** */
+    private final IgniteUuid id;
+
+    /**
+     * @param impl Discovery implementation.
+     */
+    ZkCommunicationErrorProcessFuture(ZookeeperDiscoveryImpl impl, long 
timeout) {
+        this.impl = impl;
+
+        id = IgniteUuid.fromUuid(impl.localNode().id());
+
+        endTime = System.currentTimeMillis() + timeout;
+    }
+
+    GridFutureAdapter<Boolean> nodeStatusFuture(UUID nodeId) {
+        GridFutureAdapter<Boolean> fut = errNodes.get(nodeId);
+
+        if (fut == null) {
+            GridFutureAdapter<Boolean> old = errNodes.putIfAbsent(nodeId, fut 
= new GridFutureAdapter<>());
+
+            if (old != null)
+                fut = old;
+        }
+
+        if (impl.node(nodeId) == null)
+            fut.onDone(false);
+
+        return fut;
+    }
+
+    void onNodeFailed(UUID nodeId) {
+        GridFutureAdapter<Boolean> fut = errNodes.get(nodeId);
+
+        if (fut != null)
+            fut.onDone(false);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void run() {
+        // TODO ZK
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid id() {
+        return null;
+    }
+
+    @Override public long endTime() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onTimeout() {
+        // TODO ZK
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b78183fe/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalCommunicationErrorMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalCommunicationErrorMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalCommunicationErrorMessage.java
new file mode 100644
index 0000000..d7ed7ab
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalCommunicationErrorMessage.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class ZkInternalCommunicationErrorMessage implements 
DiscoverySpiCustomMessage, ZkInternalMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoverySpiCustomMessage ackMessage() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b78183fe/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 5190329..9c1e398 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -31,6 +31,7 @@ import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
@@ -232,6 +233,41 @@ public class ZookeeperDiscoveryImpl {
         return rtState.top.nodesById.get(nodeId);
     }
 
+    /** */
+    private final AtomicReference<ZkCommunicationErrorProcessFuture> 
commErrProcFut = new AtomicReference<>();
+
+    /**
+     * @param nodeId Problem node ID
+     * @param err Connect error.
+     */
+    public void onCommunicationError(UUID nodeId, Exception err) {
+        ZookeeperClusterNode node = node(nodeId);
+
+        if (node == null)
+            return;
+
+        ZkCommunicationErrorProcessFuture fut = commErrProcFut.get();
+
+        if (fut == null || fut.isDone()) {
+            ZkCommunicationErrorProcessFuture newFut = new 
ZkCommunicationErrorProcessFuture(this, node.sessionTimeout());
+
+            if (commErrProcFut.compareAndSet(fut, newFut)) {
+                fut = newFut;
+
+                sendCustomMessage(new ZkInternalCommunicationErrorMessage());
+            }
+            else
+                fut = commErrProcFut.get();
+        }
+
+        try {
+            fut.nodeStatusFuture(nodeId).get();
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteSpiException(e);
+        }
+    }
+
     /**
      * @param nodeId Node ID.
      * @return Ping result.
@@ -2014,6 +2050,11 @@ public class ZookeeperDiscoveryImpl {
             if (pingFut != null)
                 pingFut.onDone(false);
 
+            ZkCommunicationErrorProcessFuture commErrFut = 
commErrProcFut.get();
+
+            if (commErrFut != null)
+                commErrFut.onNodeFailed(failedNode.id());
+
             final List<ClusterNode> topSnapshot = 
rtState.top.topologySnapshot();
 
             lsnr.onDiscovery(EVT_NODE_FAILED,

http://git-wip-us.apache.org/repos/asf/ignite/blob/b78183fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java
index 86c0fa4..2f77dae 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java
@@ -661,6 +661,8 @@ public class IgniteOptimisticTxSuspendResumeTest extends 
GridCommonAbstractTest
                 ", backups=" + ccfg.getBackups() +
                 ", near=" + (ccfg.getNearConfiguration() != null) + "]");
 
+            awaitPartitionMapExchange();
+
             int srvNum = serversNumber();
             if (serversNumber() > 1) {
                 ignite(serversNumber() + 1).createNearCache(ccfg.getName(), 
new NearCacheConfiguration<>());

Reply via email to