Make the zookeeper client used by bookkeeper client retry on session expires

* the zookeeper client used by bookkeeper client is purely for metadata 
accesses, so we should retry on session expires.
* remove the unnessary zookeeper session handling in bk log handler. as we 
don't necessary to fail bookkeeper client or log handler when session expires 
as it would be handled and retried by the zookeeper client.
* Make the retry infinitely if the retry settings for bkc zookeeper client is 
set to 0 or negative.

RB_ID=843057


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/f18fe172
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/f18fe172
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/f18fe172

Branch: refs/heads/merge/DL-98
Commit: f18fe172fb4559bcabb7173d1cd0b9d27fc23c93
Parents: 72a786e
Author: Sijie Guo <sij...@twitter.com>
Authored: Mon Jul 11 10:21:44 2016 -0700
Committer: Sijie Guo <sij...@twitter.com>
Committed: Mon Dec 12 16:58:20 2016 -0800

----------------------------------------------------------------------
 .../distributedlog/BKAsyncLogReaderDLSN.java    | 15 +-----
 .../BKDistributedLogNamespace.java              |  5 +-
 .../twitter/distributedlog/BKLogHandler.java    | 17 -------
 .../distributedlog/BookKeeperClient.java        | 48 ++++----------------
 .../DistributedLogConfiguration.java            |  8 +++-
 5 files changed, 17 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f18fe172/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
index 7d3d53d..b1a9273 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
@@ -49,7 +49,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Function1;
@@ -73,7 +72,7 @@ import scala.runtime.AbstractFunction1;
  * <li> `async_reader`/idle_reader_error: counter. the number idle reader 
errors.
  * </ul>
  */
-class BKAsyncLogReaderDLSN implements 
ZooKeeperClient.ZooKeeperSessionExpireNotifier, AsyncLogReader, Runnable, 
AsyncNotification {
+class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, 
AsyncNotification {
     static final Logger LOG = 
LoggerFactory.getLogger(BKAsyncLogReaderDLSN.class);
 
     private static final Function1<List<LogRecordWithDLSN>, LogRecordWithDLSN> 
READ_NEXT_MAP_FUNCTION =
@@ -86,7 +85,6 @@ class BKAsyncLogReaderDLSN implements 
ZooKeeperClient.ZooKeeperSessionExpireNoti
 
     protected final BKDistributedLogManager bkDistributedLogManager;
     protected final BKLogReadHandler bkLedgerManager;
-    private Watcher sessionExpireWatcher = null;
     private final AtomicReference<Throwable> lastException = new 
AtomicReference<Throwable>();
     private final ScheduledExecutorService executorService;
     private final ConcurrentLinkedQueue<PendingReadRequest> pendingRequests = 
new ConcurrentLinkedQueue<PendingReadRequest>();
@@ -218,7 +216,6 @@ class BKAsyncLogReaderDLSN implements 
ZooKeeperClient.ZooKeeperSessionExpireNoti
         this.executorService = executorService;
         this.bkLedgerManager = 
bkDistributedLogManager.createReadHandler(subscriberId,
                 lockStateExecutor, this, deserializeRecordSet, true);
-        sessionExpireWatcher = 
this.bkLedgerManager.registerExpirationHandler(this);
         LOG.debug("Starting async reader at {}", startDLSN);
         this.startDLSN = startDLSN;
         this.scheduleDelayStopwatch = Stopwatch.createUnstarted();
@@ -255,14 +252,6 @@ class BKAsyncLogReaderDLSN implements 
ZooKeeperClient.ZooKeeperSessionExpireNoti
         this.idleReaderTimeoutTask = scheduleIdleReaderTaskIfNecessary();
     }
 
-    @Override
-    public void notifySessionExpired() {
-        // ZK Session notification is an indication to check if this has 
resulted in a fatal error
-        // of the underlying reader, in itself this reader doesnt error out 
unless the underlying
-        // reader has hit an error
-        scheduleBackgroundRead();
-    }
-
     private ScheduledFuture<?> scheduleIdleReaderTaskIfNecessary() {
         if (idleErrorThresholdMillis < Integer.MAX_VALUE) {
             // Dont run the task more than once every seconds (for sanity)
@@ -494,8 +483,6 @@ class BKAsyncLogReaderDLSN implements 
ZooKeeperClient.ZooKeeperSessionExpireNoti
 
         cancelAllPendingReads(exception);
 
-        bkLedgerManager.unregister(sessionExpireWatcher);
-
         FutureUtils.ignore(bkLedgerManager.asyncClose()).proxyTo(closePromise);
         return closePromise;
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f18fe172/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
index 0b522d0..7a4fd7f 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
@@ -622,13 +622,10 @@ public class BKDistributedLogNamespace implements 
DistributedLogNamespace {
                                                                   
DistributedLogConfiguration conf,
                                                                   String 
zkServers,
                                                                   StatsLogger 
statsLogger) {
-        RetryPolicy retryPolicy = null;
-        if (conf.getZKNumRetries() > 0) {
-            retryPolicy = new BoundExponentialBackoffRetryPolicy(
+        RetryPolicy retryPolicy = new BoundExponentialBackoffRetryPolicy(
                     conf.getBKClientZKRetryBackoffStartMillis(),
                     conf.getBKClientZKRetryBackoffMaxMillis(),
                     conf.getBKClientZKNumRetries());
-        }
         ZooKeeperClientBuilder builder = ZooKeeperClientBuilder.newBuilder()
                 .name(zkcName)
                 
.sessionTimeoutMs(conf.getBKClientZKSessionTimeoutMilliSeconds())

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f18fe172/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
index a6ec318..a84261a 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
@@ -1293,21 +1293,4 @@ public abstract class BKLogHandler implements Watcher, 
AsyncCloseable, AsyncAbor
         }
     }
 
-    // ZooKeeper Watchers
-
-    Watcher registerExpirationHandler(final 
ZooKeeperClient.ZooKeeperSessionExpireNotifier onExpired) {
-        if (conf.getZKNumRetries() > 0) {
-            return new Watcher() {
-                @Override
-                public void process(WatchedEvent event) {
-                    // nop
-                }
-            };
-        }
-        return zooKeeperClient.registerExpirationHandler(onExpired);
-    }
-
-    boolean unregister(Watcher watcher) {
-        return zooKeeperClient.unregister(watcher);
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f18fe172/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java
index fd22b8f..c39ae4c 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java
@@ -17,6 +17,7 @@
  */
 package com.twitter.distributedlog;
 
+import com.google.common.base.Optional;
 import com.twitter.distributedlog.ZooKeeperClient.Credentials;
 import com.twitter.distributedlog.ZooKeeperClient.DigestCredentials;
 import com.twitter.distributedlog.exceptions.AlreadyClosedException;
@@ -41,16 +42,12 @@ import 
org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
 import org.apache.bookkeeper.zookeeper.RetryPolicy;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Watcher;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 import org.jboss.netty.util.HashedWheelTimer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.google.common.base.Optional;
 
 import static com.google.common.base.Charsets.UTF_8;
 
@@ -62,7 +59,7 @@ import static com.google.common.base.Charsets.UTF_8;
  * <li> bookkeeper operation stats are exposed under current scope by {@link 
BookKeeper}
  * </ul>
  */
-public class BookKeeperClient implements 
ZooKeeperClient.ZooKeeperSessionExpireNotifier {
+public class BookKeeperClient {
     static final Logger LOG = LoggerFactory.getLogger(BookKeeperClient.class);
 
     // Parameters to build bookkeeper client
@@ -83,14 +80,10 @@ public class BookKeeperClient implements 
ZooKeeperClient.ZooKeeperSessionExpireN
     // feature provider
     private final Optional<FeatureProvider> featureProvider;
 
-    private Watcher sessionExpireWatcher = null;
-    private AtomicBoolean zkSessionExpired = new AtomicBoolean(false);
-
     @SuppressWarnings("deprecation")
     private synchronized void commonInitialization(
             DistributedLogConfiguration conf, String ledgersPath,
-            ClientSocketChannelFactory channelFactory, StatsLogger 
statsLogger, HashedWheelTimer requestTimer,
-            boolean registerExpirationHandler)
+            ClientSocketChannelFactory channelFactory, StatsLogger 
statsLogger, HashedWheelTimer requestTimer)
         throws IOException, InterruptedException, KeeperException {
         ClientConfiguration bkConfig = new ClientConfiguration();
         bkConfig.setAddEntryTimeout(conf.getBKClientWriteTimeout());
@@ -124,10 +117,6 @@ public class BookKeeperClient implements 
ZooKeeperClient.ZooKeeperSessionExpireN
             .requestTimer(requestTimer)
             .featureProvider(featureProvider.orNull())
             .build();
-
-        if (registerExpirationHandler) {
-            sessionExpireWatcher = this.zkc.registerExpirationHandler(this);
-        }
     }
 
     BookKeeperClient(DistributedLogConfiguration conf,
@@ -159,16 +148,11 @@ public class BookKeeperClient implements 
ZooKeeperClient.ZooKeeperSessionExpireN
         if (null != this.bkc) {
             return;
         }
-        boolean registerExpirationHandler;
         if (null == this.zkc) {
             int zkSessionTimeout = 
conf.getBKClientZKSessionTimeoutMilliSeconds();
-            RetryPolicy retryPolicy = null;
-            if (conf.getBKClientZKNumRetries() > 0) {
-                retryPolicy = new BoundExponentialBackoffRetryPolicy(
+            RetryPolicy retryPolicy = new BoundExponentialBackoffRetryPolicy(
                         conf.getBKClientZKRetryBackoffStartMillis(),
                         conf.getBKClientZKRetryBackoffMaxMillis(), 
conf.getBKClientZKNumRetries());
-            }
-
             Credentials credentials = Credentials.NONE;
             if (conf.getZkAclId() != null) {
                 credentials = new DigestCredentials(conf.getZkAclId(), 
conf.getZkAclId());
@@ -178,10 +162,9 @@ public class BookKeeperClient implements 
ZooKeeperClient.ZooKeeperSessionExpireN
                                            retryPolicy, 
statsLogger.scope("bkc_zkc"), conf.getZKClientNumberRetryThreads(),
                                            
conf.getBKClientZKRequestRateLimit(), credentials);
         }
-        registerExpirationHandler = conf.getBKClientZKNumRetries() <= 0;
 
         try {
-            commonInitialization(conf, ledgersPath, channelFactory, 
statsLogger, requestTimer, registerExpirationHandler);
+            commonInitialization(conf, ledgersPath, channelFactory, 
statsLogger, requestTimer);
         } catch (InterruptedException e) {
             throw new DLInterruptedException("Interrupted on creating 
bookkeeper client " + name + " : ", e);
         } catch (KeeperException e) {
@@ -190,18 +173,18 @@ public class BookKeeperClient implements 
ZooKeeperClient.ZooKeeperSessionExpireN
 
         if (ownZK) {
             LOG.info("BookKeeper Client created {} with its own ZK Client : 
ledgersPath = {}, numRetries = {}, " +
-                    "sessionTimeout = {}, backoff = {}, maxBackoff = {}, 
dnsResolver = {}, registerExpirationHandler = {}",
+                    "sessionTimeout = {}, backoff = {}, maxBackoff = {}, 
dnsResolver = {}",
                     new Object[] { name, ledgersPath,
                     conf.getBKClientZKNumRetries(), 
conf.getBKClientZKSessionTimeoutMilliSeconds(),
                     conf.getBKClientZKRetryBackoffStartMillis(), 
conf.getBKClientZKRetryBackoffMaxMillis(),
-                    conf.getBkDNSResolverOverrides(), 
registerExpirationHandler });
+                    conf.getBkDNSResolverOverrides() });
         } else {
             LOG.info("BookKeeper Client created {} with shared zookeeper 
client : ledgersPath = {}, numRetries = {}, " +
-                    "sessionTimeout = {}, backoff = {}, maxBackoff = {}, 
dnsResolver = {}, registerExpirationHandler = {}",
+                    "sessionTimeout = {}, backoff = {}, maxBackoff = {}, 
dnsResolver = {}",
                     new Object[] { name, ledgersPath,
                     conf.getZKNumRetries(), 
conf.getZKSessionTimeoutMilliseconds(),
                     conf.getZKRetryBackoffStartMillis(), 
conf.getZKRetryBackoffMaxMillis(),
-                    conf.getBkDNSResolverOverrides(), 
registerExpirationHandler });
+                    conf.getBkDNSResolverOverrides() });
         }
     }
 
@@ -284,9 +267,6 @@ public class BookKeeperClient implements 
ZooKeeperClient.ZooKeeperSessionExpireN
             }
         }
         if (null != zkc) {
-            if (null != sessionExpireWatcher) {
-                zkc.unregister(sessionExpireWatcher);
-            }
             if (ownZK) {
                 zkc.close();
             }
@@ -294,20 +274,10 @@ public class BookKeeperClient implements 
ZooKeeperClient.ZooKeeperSessionExpireN
         closed = true;
     }
 
-    @Override
-    public void notifySessionExpired() {
-        zkSessionExpired.set(true);
-    }
-
     public synchronized void checkClosedOrInError() throws 
AlreadyClosedException {
         if (closed) {
             LOG.error("BookKeeper Client {} is already closed", name);
             throw new AlreadyClosedException("BookKeeper Client " + name + " 
is already closed");
         }
-
-        if (zkSessionExpired.get()) {
-            LOG.error("BookKeeper Client {}'s Zookeeper session has expired", 
name);
-            throw new AlreadyClosedException("BookKeeper Client " + name + "'s 
Zookeeper session has expired");
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f18fe172/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
index c2057df..5d0e59a 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
@@ -809,12 +809,16 @@ public class DistributedLogConfiguration extends 
CompositeConfiguration {
      * Get num of retries for zookeeper client that used by bookkeeper client.
      * <p>Retries only happen on retryable failures like session expired,
      * session moved. for permanent failures, the request will fail 
immediately.
-     * The default value is 3.
+     * The default value is 3. Setting it to zero or negative will retry 
infinitely.
      *
      * @return num of retries of zookeeper client used by bookkeeper client.
      */
     public int getBKClientZKNumRetries() {
-        return this.getInt(BKDL_BKCLIENT_ZK_NUM_RETRIES, 
BKDL_BKCLIENT_ZK_NUM_RETRIES_DEFAULT);
+        int zkNumRetries = this.getInt(BKDL_BKCLIENT_ZK_NUM_RETRIES, 
BKDL_BKCLIENT_ZK_NUM_RETRIES_DEFAULT);
+        if (zkNumRetries <= 0) {
+            return Integer.MAX_VALUE;
+        }
+        return zkNumRetries;
     }
 
     /**

Reply via email to