Make the zookeeper client used by bookkeeper client retry on session expires
* the zookeeper client used by bookkeeper client is purely for metadata accesses, so we should retry on session expires. * remove the unnessary zookeeper session handling in bk log handler. as we don't necessary to fail bookkeeper client or log handler when session expires as it would be handled and retried by the zookeeper client. * Make the retry infinitely if the retry settings for bkc zookeeper client is set to 0 or negative. RB_ID=843057 Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/f18fe172 Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/f18fe172 Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/f18fe172 Branch: refs/heads/merge/DL-98 Commit: f18fe172fb4559bcabb7173d1cd0b9d27fc23c93 Parents: 72a786e Author: Sijie Guo <sij...@twitter.com> Authored: Mon Jul 11 10:21:44 2016 -0700 Committer: Sijie Guo <sij...@twitter.com> Committed: Mon Dec 12 16:58:20 2016 -0800 ---------------------------------------------------------------------- .../distributedlog/BKAsyncLogReaderDLSN.java | 15 +----- .../BKDistributedLogNamespace.java | 5 +- .../twitter/distributedlog/BKLogHandler.java | 17 ------- .../distributedlog/BookKeeperClient.java | 48 ++++---------------- .../DistributedLogConfiguration.java | 8 +++- 5 files changed, 17 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f18fe172/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java index 7d3d53d..b1a9273 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java @@ -49,7 +49,6 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.zookeeper.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Function1; @@ -73,7 +72,7 @@ import scala.runtime.AbstractFunction1; * <li> `async_reader`/idle_reader_error: counter. the number idle reader errors. * </ul> */ -class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNotifier, AsyncLogReader, Runnable, AsyncNotification { +class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotification { static final Logger LOG = LoggerFactory.getLogger(BKAsyncLogReaderDLSN.class); private static final Function1<List<LogRecordWithDLSN>, LogRecordWithDLSN> READ_NEXT_MAP_FUNCTION = @@ -86,7 +85,6 @@ class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNoti protected final BKDistributedLogManager bkDistributedLogManager; protected final BKLogReadHandler bkLedgerManager; - private Watcher sessionExpireWatcher = null; private final AtomicReference<Throwable> lastException = new AtomicReference<Throwable>(); private final ScheduledExecutorService executorService; private final ConcurrentLinkedQueue<PendingReadRequest> pendingRequests = new ConcurrentLinkedQueue<PendingReadRequest>(); @@ -218,7 +216,6 @@ class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNoti this.executorService = executorService; this.bkLedgerManager = bkDistributedLogManager.createReadHandler(subscriberId, lockStateExecutor, this, deserializeRecordSet, true); - sessionExpireWatcher = this.bkLedgerManager.registerExpirationHandler(this); LOG.debug("Starting async reader at {}", startDLSN); this.startDLSN = startDLSN; this.scheduleDelayStopwatch = Stopwatch.createUnstarted(); @@ -255,14 +252,6 @@ class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNoti this.idleReaderTimeoutTask = scheduleIdleReaderTaskIfNecessary(); } - @Override - public void notifySessionExpired() { - // ZK Session notification is an indication to check if this has resulted in a fatal error - // of the underlying reader, in itself this reader doesnt error out unless the underlying - // reader has hit an error - scheduleBackgroundRead(); - } - private ScheduledFuture<?> scheduleIdleReaderTaskIfNecessary() { if (idleErrorThresholdMillis < Integer.MAX_VALUE) { // Dont run the task more than once every seconds (for sanity) @@ -494,8 +483,6 @@ class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNoti cancelAllPendingReads(exception); - bkLedgerManager.unregister(sessionExpireWatcher); - FutureUtils.ignore(bkLedgerManager.asyncClose()).proxyTo(closePromise); return closePromise; } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f18fe172/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java index 0b522d0..7a4fd7f 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java @@ -622,13 +622,10 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { DistributedLogConfiguration conf, String zkServers, StatsLogger statsLogger) { - RetryPolicy retryPolicy = null; - if (conf.getZKNumRetries() > 0) { - retryPolicy = new BoundExponentialBackoffRetryPolicy( + RetryPolicy retryPolicy = new BoundExponentialBackoffRetryPolicy( conf.getBKClientZKRetryBackoffStartMillis(), conf.getBKClientZKRetryBackoffMaxMillis(), conf.getBKClientZKNumRetries()); - } ZooKeeperClientBuilder builder = ZooKeeperClientBuilder.newBuilder() .name(zkcName) .sessionTimeoutMs(conf.getBKClientZKSessionTimeoutMilliSeconds()) http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f18fe172/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java index a6ec318..a84261a 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java @@ -1293,21 +1293,4 @@ public abstract class BKLogHandler implements Watcher, AsyncCloseable, AsyncAbor } } - // ZooKeeper Watchers - - Watcher registerExpirationHandler(final ZooKeeperClient.ZooKeeperSessionExpireNotifier onExpired) { - if (conf.getZKNumRetries() > 0) { - return new Watcher() { - @Override - public void process(WatchedEvent event) { - // nop - } - }; - } - return zooKeeperClient.registerExpirationHandler(onExpired); - } - - boolean unregister(Watcher watcher) { - return zooKeeperClient.unregister(watcher); - } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f18fe172/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java index fd22b8f..c39ae4c 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java @@ -17,6 +17,7 @@ */ package com.twitter.distributedlog; +import com.google.common.base.Optional; import com.twitter.distributedlog.ZooKeeperClient.Credentials; import com.twitter.distributedlog.ZooKeeperClient.DigestCredentials; import com.twitter.distributedlog.exceptions.AlreadyClosedException; @@ -41,16 +42,12 @@ import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; import org.apache.bookkeeper.zookeeper.RetryPolicy; import org.apache.commons.configuration.ConfigurationException; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.Watcher; import org.jboss.netty.channel.socket.ClientSocketChannelFactory; import org.jboss.netty.util.HashedWheelTimer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.concurrent.atomic.AtomicBoolean; - -import com.google.common.base.Optional; import static com.google.common.base.Charsets.UTF_8; @@ -62,7 +59,7 @@ import static com.google.common.base.Charsets.UTF_8; * <li> bookkeeper operation stats are exposed under current scope by {@link BookKeeper} * </ul> */ -public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireNotifier { +public class BookKeeperClient { static final Logger LOG = LoggerFactory.getLogger(BookKeeperClient.class); // Parameters to build bookkeeper client @@ -83,14 +80,10 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN // feature provider private final Optional<FeatureProvider> featureProvider; - private Watcher sessionExpireWatcher = null; - private AtomicBoolean zkSessionExpired = new AtomicBoolean(false); - @SuppressWarnings("deprecation") private synchronized void commonInitialization( DistributedLogConfiguration conf, String ledgersPath, - ClientSocketChannelFactory channelFactory, StatsLogger statsLogger, HashedWheelTimer requestTimer, - boolean registerExpirationHandler) + ClientSocketChannelFactory channelFactory, StatsLogger statsLogger, HashedWheelTimer requestTimer) throws IOException, InterruptedException, KeeperException { ClientConfiguration bkConfig = new ClientConfiguration(); bkConfig.setAddEntryTimeout(conf.getBKClientWriteTimeout()); @@ -124,10 +117,6 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN .requestTimer(requestTimer) .featureProvider(featureProvider.orNull()) .build(); - - if (registerExpirationHandler) { - sessionExpireWatcher = this.zkc.registerExpirationHandler(this); - } } BookKeeperClient(DistributedLogConfiguration conf, @@ -159,16 +148,11 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN if (null != this.bkc) { return; } - boolean registerExpirationHandler; if (null == this.zkc) { int zkSessionTimeout = conf.getBKClientZKSessionTimeoutMilliSeconds(); - RetryPolicy retryPolicy = null; - if (conf.getBKClientZKNumRetries() > 0) { - retryPolicy = new BoundExponentialBackoffRetryPolicy( + RetryPolicy retryPolicy = new BoundExponentialBackoffRetryPolicy( conf.getBKClientZKRetryBackoffStartMillis(), conf.getBKClientZKRetryBackoffMaxMillis(), conf.getBKClientZKNumRetries()); - } - Credentials credentials = Credentials.NONE; if (conf.getZkAclId() != null) { credentials = new DigestCredentials(conf.getZkAclId(), conf.getZkAclId()); @@ -178,10 +162,9 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN retryPolicy, statsLogger.scope("bkc_zkc"), conf.getZKClientNumberRetryThreads(), conf.getBKClientZKRequestRateLimit(), credentials); } - registerExpirationHandler = conf.getBKClientZKNumRetries() <= 0; try { - commonInitialization(conf, ledgersPath, channelFactory, statsLogger, requestTimer, registerExpirationHandler); + commonInitialization(conf, ledgersPath, channelFactory, statsLogger, requestTimer); } catch (InterruptedException e) { throw new DLInterruptedException("Interrupted on creating bookkeeper client " + name + " : ", e); } catch (KeeperException e) { @@ -190,18 +173,18 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN if (ownZK) { LOG.info("BookKeeper Client created {} with its own ZK Client : ledgersPath = {}, numRetries = {}, " + - "sessionTimeout = {}, backoff = {}, maxBackoff = {}, dnsResolver = {}, registerExpirationHandler = {}", + "sessionTimeout = {}, backoff = {}, maxBackoff = {}, dnsResolver = {}", new Object[] { name, ledgersPath, conf.getBKClientZKNumRetries(), conf.getBKClientZKSessionTimeoutMilliSeconds(), conf.getBKClientZKRetryBackoffStartMillis(), conf.getBKClientZKRetryBackoffMaxMillis(), - conf.getBkDNSResolverOverrides(), registerExpirationHandler }); + conf.getBkDNSResolverOverrides() }); } else { LOG.info("BookKeeper Client created {} with shared zookeeper client : ledgersPath = {}, numRetries = {}, " + - "sessionTimeout = {}, backoff = {}, maxBackoff = {}, dnsResolver = {}, registerExpirationHandler = {}", + "sessionTimeout = {}, backoff = {}, maxBackoff = {}, dnsResolver = {}", new Object[] { name, ledgersPath, conf.getZKNumRetries(), conf.getZKSessionTimeoutMilliseconds(), conf.getZKRetryBackoffStartMillis(), conf.getZKRetryBackoffMaxMillis(), - conf.getBkDNSResolverOverrides(), registerExpirationHandler }); + conf.getBkDNSResolverOverrides() }); } } @@ -284,9 +267,6 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN } } if (null != zkc) { - if (null != sessionExpireWatcher) { - zkc.unregister(sessionExpireWatcher); - } if (ownZK) { zkc.close(); } @@ -294,20 +274,10 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN closed = true; } - @Override - public void notifySessionExpired() { - zkSessionExpired.set(true); - } - public synchronized void checkClosedOrInError() throws AlreadyClosedException { if (closed) { LOG.error("BookKeeper Client {} is already closed", name); throw new AlreadyClosedException("BookKeeper Client " + name + " is already closed"); } - - if (zkSessionExpired.get()) { - LOG.error("BookKeeper Client {}'s Zookeeper session has expired", name); - throw new AlreadyClosedException("BookKeeper Client " + name + "'s Zookeeper session has expired"); - } } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f18fe172/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java index c2057df..5d0e59a 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java @@ -809,12 +809,16 @@ public class DistributedLogConfiguration extends CompositeConfiguration { * Get num of retries for zookeeper client that used by bookkeeper client. * <p>Retries only happen on retryable failures like session expired, * session moved. for permanent failures, the request will fail immediately. - * The default value is 3. + * The default value is 3. Setting it to zero or negative will retry infinitely. * * @return num of retries of zookeeper client used by bookkeeper client. */ public int getBKClientZKNumRetries() { - return this.getInt(BKDL_BKCLIENT_ZK_NUM_RETRIES, BKDL_BKCLIENT_ZK_NUM_RETRIES_DEFAULT); + int zkNumRetries = this.getInt(BKDL_BKCLIENT_ZK_NUM_RETRIES, BKDL_BKCLIENT_ZK_NUM_RETRIES_DEFAULT); + if (zkNumRetries <= 0) { + return Integer.MAX_VALUE; + } + return zkNumRetries; } /**