Repository: ignite
Updated Branches:
  refs/heads/master 45abb9c70 -> a8d50e4cd


IGNITE-9683 Create manual pinger for ZK client - Fixes #4839.

Signed-off-by: Ivan Rakov <ira...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a8d50e4c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a8d50e4c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a8d50e4c

Branch: refs/heads/master
Commit: a8d50e4cd533225060b0805b5c977668ba11ee48
Parents: 45abb9c
Author: AMedvedev <g...@andmed.us>
Authored: Thu Sep 27 22:57:13 2018 +0300
Committer: Ivan Rakov <ira...@apache.org>
Committed: Thu Sep 27 22:57:13 2018 +0300

----------------------------------------------------------------------
 .../spi/discovery/zk/internal/ZkPinger.java     | 88 ++++++++++++++++++++
 .../discovery/zk/internal/ZookeeperClient.java  | 30 ++++++-
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 13 +++
 3 files changed, 130 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a8d50e4c/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkPinger.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkPinger.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkPinger.java
new file mode 100644
index 0000000..964b8fc
--- /dev/null
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkPinger.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.Timer;
+import java.util.TimerTask;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * Periodically pings ZK server with simple request. Prevents connection abort 
on timeout from ZK side.
+ */
+public class ZkPinger extends TimerTask {
+    /** Ping interval milliseconds. */
+    private static final int PING_INTERVAL_MS = 2000;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Zk client. */
+    private final ZooKeeper zkClient;
+
+    /** Paths. */
+    private final ZkIgnitePaths paths;
+
+    /** Scheduler. */
+    private final Timer scheduler = new Timer("ignite-zk-pinger");
+
+    /**
+     * @param log Logger.
+     * @param zkClient Zk client.
+     * @param paths Paths.
+     */
+    public ZkPinger(IgniteLogger log, ZooKeeper zkClient, ZkIgnitePaths paths) 
{
+        this.log = log;
+        this.zkClient = zkClient;
+        this.paths = paths;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void run() {
+        try {
+            zkClient.exists(paths.clusterDir, false);
+        }
+        catch (Throwable t) {
+            if (zkClient.getState().isAlive())
+                U.warn(log, "Failed to ping Zookeeper.", t);
+            else
+                scheduler.cancel();
+        }
+
+    }
+
+    /**
+     * Starts ping process.
+     */
+    public void start() {
+        scheduler.scheduleAtFixedRate(this, 0, PING_INTERVAL_MS);
+    }
+
+    /**
+     * Stops ping process.
+     */
+    public void stop() {
+        try {
+            scheduler.cancel();
+        }
+        catch (Exception e) {
+            log.warning("Failed to cancel Zookeeper Pinger scheduler.", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a8d50e4c/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
index a2788a1..7f2cb21 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
@@ -54,6 +54,10 @@ public class ZookeeperClient implements Watcher {
     private static final int DFLT_MAX_RETRY_COUNT = 10;
 
     /** */
+    private static final boolean PINGER_ENABLED =
+        
IgniteSystemProperties.getBoolean("IGNITE_ZOOKEEPER_DISCOVERY_PINGER_ENABLED", 
false);
+
+    /** */
     private final AtomicInteger retryCount = new AtomicInteger();
 
     /** */
@@ -95,6 +99,9 @@ public class ZookeeperClient implements Watcher {
     /** */
     private volatile boolean closing;
 
+    /** */
+    private volatile ZkPinger pinger;
+
     /**
      * @param log Logger.
      * @param connectString ZK connection string.
@@ -164,6 +171,13 @@ public class ZookeeperClient implements Watcher {
         }
     }
 
+    /**
+     * @return {@code True} if pinger is enabled
+     */
+    boolean pingerEnabled() {
+        return PINGER_ENABLED;
+    }
+
     /** */
     String state() {
         synchronized (stateMux) {
@@ -580,7 +594,6 @@ public class ZookeeperClient implements Watcher {
     }
 
     /**
-     * @param parent Parent path.
      * @param paths Children paths.
      * @param ver Version.
      * @throws ZookeeperClientFailedException If connection to zk was lost.
@@ -811,6 +824,13 @@ public class ZookeeperClient implements Watcher {
      *
      */
     public void close() {
+        if (PINGER_ENABLED) {
+            ZkPinger pinger0 = pinger;
+
+            if (pinger0 != null)
+                pinger0.stop();
+        }
+
         closeClient();
     }
 
@@ -946,6 +966,14 @@ public class ZookeeperClient implements Watcher {
     }
 
     /**
+     * @param pinger Pinger.
+     */
+    void attachPinger(ZkPinger pinger) {
+        if (PINGER_ENABLED)
+            this.pinger = pinger;
+    }
+
+    /**
      *
      */
     interface ZkAsyncOperation {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a8d50e4c/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index b89fbe4..3771c7b 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -787,6 +787,19 @@ public class ZookeeperDiscoveryImpl {
             }
 
             startJoin(rtState, prevState, joinDataBytes);
+
+            try {
+                if (rtState.zkClient.pingerEnabled() && !locNode.isClient() && 
!locNode.isDaemon()) {
+                    ZkPinger pinger = new ZkPinger(log, rtState.zkClient.zk(), 
zkPaths);
+
+                    rtState.zkClient.attachPinger(pinger);
+
+                    pinger.start();
+                }
+            }
+            catch (Exception e) {
+                log.error("Failed to create and attach Zookeeper pinger", e);
+            }
         }
         finally {
             busyLock.leaveBusy();

Reply via email to