Repository: ignite
Updated Branches:
  refs/heads/ignite-zk c1c644b9c -> 2283df29b


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2283df29
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2283df29
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2283df29

Branch: refs/heads/ignite-zk
Commit: 2283df29bdd5c5ff5ec4eb4c9cdcd0f21bc2d489
Parents: c1c644b
Author: sboikov <sboi...@gridgain.com>
Authored: Thu Dec 21 15:01:35 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Thu Dec 21 15:13:06 2017 +0300

----------------------------------------------------------------------
 .../datastreamer/DataStreamerImpl.java          | 31 +++++++++++++-------
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 17 ++++++++---
 ...niteClientReconnectFailoverAbstractTest.java |  7 +++--
 3 files changed, 38 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2283df29/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 4e3f233..e97ac79 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -45,6 +45,7 @@ import javax.cache.expiry.ExpiryPolicy;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteDataStreamerTimeoutException;
 import org.apache.ignite.IgniteException;
@@ -98,6 +99,7 @@ import 
org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.GPC;
@@ -1068,6 +1070,9 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
             return;
 
         while (true) {
+            if (disconnectErr != null)
+                throw disconnectErr;
+
             Queue<IgniteInternalFuture<?>> q = null;
 
             for (Buffer buf : bufMappings.values()) {
@@ -1114,8 +1119,8 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
                         throw new IgniteDataStreamerTimeoutException("Data 
streamer exceeded timeout on flush.", e);
                     }
                     catch (IgniteCheckedException e) {
-                        //if (log.isDebugEnabled())
-                            log.error("Failed to flush buffer: " + e, e);
+                        if (log.isDebugEnabled())
+                            log.debug("Failed to flush buffer: " + e);
 
                         err = true;
                     }
@@ -1808,15 +1813,19 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
                 catch (IgniteCheckedException e) {
                     GridFutureAdapter<Object> fut0 = 
((GridFutureAdapter<Object>)fut);
 
-                    try {
-                        if (ctx.discovery().alive(node) && 
ctx.discovery().pingNode(node.id()))
-                            fut0.onDone(e);
-                        else
-                            fut0.onDone(new 
ClusterTopologyCheckedException("Failed to send request (node has left): "
-                                + node.id()));
-                    }
-                    catch (IgniteClientDisconnectedCheckedException e0) {
-                        fut0.onDone(e0);
+                    if (X.hasCause(e, 
IgniteClientDisconnectedCheckedException.class, 
IgniteClientDisconnectedException.class))
+                        fut0.onDone(e);
+                    else {
+                        try {
+                            if (ctx.discovery().alive(node) && 
ctx.discovery().pingNode(node.id()))
+                                fut0.onDone(e);
+                            else
+                                fut0.onDone(new 
ClusterTopologyCheckedException("Failed to send request (node has left): "
+                                    + node.id()));
+                        }
+                        catch (IgniteClientDisconnectedCheckedException e0) {
+                            fut0.onDone(e0);
+                        }
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2283df29/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 6c9a216..d66ea2c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -339,6 +339,8 @@ public class ZookeeperDiscoveryImpl {
      * @return Ping result.
      */
     public boolean pingNode(UUID nodeId) {
+        checkState();
+
         ZkRuntimeState rtState = this.rtState;
 
         ZookeeperClusterNode node = rtState.top.nodesById.get(nodeId);
@@ -2298,7 +2300,14 @@ public class ZookeeperDiscoveryImpl {
             if (log.isDebugEnabled())
                 log.debug("Update processed events: " + 
rtState.locNodeInfo.lastProcEvt);
 
-            zkClient.setData(rtState.locNodeZkPath, 
marshalZip(rtState.locNodeInfo), -1);
+            try {
+                zkClient.setData(rtState.locNodeZkPath, 
marshalZip(rtState.locNodeInfo), -1);
+            }
+            catch (KeeperException.NoNodeException e) {
+                // Possible if node is forcible failed.
+                if (log.isDebugEnabled())
+                    log.debug("Failed to update processed events, no node: " + 
rtState.locNodeInfo.lastProcEvt);
+            }
         }
 
         ZkCommunicationErrorProcessFuture commErrFut = commErrProcFut.get();
@@ -2308,9 +2317,9 @@ public class ZookeeperDiscoveryImpl {
     }
 
     /**
-     * @param node
-     * @param evtData
-     * @throws Exception
+     * @param node Node.
+     * @param evtData Node join event data.
+     * @throws Exception If failed.
      */
     private void readAndInitSecuritySubject(ZookeeperClusterNode node, 
ZkDiscoveryNodeJoinEventData evtData) throws Exception {
         if (evtData.secSubjPartCnt > 0) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/2283df29/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
index fa8670c..37292ff 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
@@ -210,14 +210,17 @@ public abstract class 
IgniteClientReconnectFailoverAbstractTest extends IgniteCl
             }
 
             if (err != null) {
-                log.error("Test error:" + err);
+                log.error("Test error: " + err);
 
                 U.dumpThreads(log);
 
                 CyclicBarrier barrier0 = barrier;
 
-                if (barrier0 != null)
+                if (barrier0 != null) {
+                    barrier = null;
+
                     barrier0.reset();
+                }
 
                 stop.set(true);
 

Reply via email to