Repository: ignite Updated Branches: refs/heads/master 45abb9c70 -> a8d50e4cd
IGNITE-9683 Create manual pinger for ZK client - Fixes #4839. Signed-off-by: Ivan Rakov <ira...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a8d50e4c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a8d50e4c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a8d50e4c Branch: refs/heads/master Commit: a8d50e4cd533225060b0805b5c977668ba11ee48 Parents: 45abb9c Author: AMedvedev <g...@andmed.us> Authored: Thu Sep 27 22:57:13 2018 +0300 Committer: Ivan Rakov <ira...@apache.org> Committed: Thu Sep 27 22:57:13 2018 +0300 ---------------------------------------------------------------------- .../spi/discovery/zk/internal/ZkPinger.java | 88 ++++++++++++++++++++ .../discovery/zk/internal/ZookeeperClient.java | 30 ++++++- .../zk/internal/ZookeeperDiscoveryImpl.java | 13 +++ 3 files changed, 130 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a8d50e4c/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkPinger.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkPinger.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkPinger.java new file mode 100644 index 0000000..964b8fc --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkPinger.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.util.Timer; +import java.util.TimerTask; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.zookeeper.ZooKeeper; + +/** + * Periodically pings ZK server with simple request. Prevents connection abort on timeout from ZK side. + */ +public class ZkPinger extends TimerTask { + /** Ping interval milliseconds. */ + private static final int PING_INTERVAL_MS = 2000; + + /** Logger. */ + private final IgniteLogger log; + + /** Zk client. */ + private final ZooKeeper zkClient; + + /** Paths. */ + private final ZkIgnitePaths paths; + + /** Scheduler. */ + private final Timer scheduler = new Timer("ignite-zk-pinger"); + + /** + * @param log Logger. + * @param zkClient Zk client. + * @param paths Paths. + */ + public ZkPinger(IgniteLogger log, ZooKeeper zkClient, ZkIgnitePaths paths) { + this.log = log; + this.zkClient = zkClient; + this.paths = paths; + } + + /** {@inheritDoc} */ + @Override public void run() { + try { + zkClient.exists(paths.clusterDir, false); + } + catch (Throwable t) { + if (zkClient.getState().isAlive()) + U.warn(log, "Failed to ping Zookeeper.", t); + else + scheduler.cancel(); + } + + } + + /** + * Starts ping process. + */ + public void start() { + scheduler.scheduleAtFixedRate(this, 0, PING_INTERVAL_MS); + } + + /** + * Stops ping process. + */ + public void stop() { + try { + scheduler.cancel(); + } + catch (Exception e) { + log.warning("Failed to cancel Zookeeper Pinger scheduler.", e); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a8d50e4c/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java index a2788a1..7f2cb21 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java @@ -54,6 +54,10 @@ public class ZookeeperClient implements Watcher { private static final int DFLT_MAX_RETRY_COUNT = 10; /** */ + private static final boolean PINGER_ENABLED = + IgniteSystemProperties.getBoolean("IGNITE_ZOOKEEPER_DISCOVERY_PINGER_ENABLED", false); + + /** */ private final AtomicInteger retryCount = new AtomicInteger(); /** */ @@ -95,6 +99,9 @@ public class ZookeeperClient implements Watcher { /** */ private volatile boolean closing; + /** */ + private volatile ZkPinger pinger; + /** * @param log Logger. * @param connectString ZK connection string. @@ -164,6 +171,13 @@ public class ZookeeperClient implements Watcher { } } + /** + * @return {@code True} if pinger is enabled + */ + boolean pingerEnabled() { + return PINGER_ENABLED; + } + /** */ String state() { synchronized (stateMux) { @@ -580,7 +594,6 @@ public class ZookeeperClient implements Watcher { } /** - * @param parent Parent path. * @param paths Children paths. * @param ver Version. * @throws ZookeeperClientFailedException If connection to zk was lost. @@ -811,6 +824,13 @@ public class ZookeeperClient implements Watcher { * */ public void close() { + if (PINGER_ENABLED) { + ZkPinger pinger0 = pinger; + + if (pinger0 != null) + pinger0.stop(); + } + closeClient(); } @@ -946,6 +966,14 @@ public class ZookeeperClient implements Watcher { } /** + * @param pinger Pinger. + */ + void attachPinger(ZkPinger pinger) { + if (PINGER_ENABLED) + this.pinger = pinger; + } + + /** * */ interface ZkAsyncOperation { http://git-wip-us.apache.org/repos/asf/ignite/blob/a8d50e4c/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index b89fbe4..3771c7b 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -787,6 +787,19 @@ public class ZookeeperDiscoveryImpl { } startJoin(rtState, prevState, joinDataBytes); + + try { + if (rtState.zkClient.pingerEnabled() && !locNode.isClient() && !locNode.isDaemon()) { + ZkPinger pinger = new ZkPinger(log, rtState.zkClient.zk(), zkPaths); + + rtState.zkClient.attachPinger(pinger); + + pinger.start(); + } + } + catch (Exception e) { + log.error("Failed to create and attach Zookeeper pinger", e); + } } finally { busyLock.leaveBusy();