DL-31: Provide flag to disable zk based distributed lock DL doesn't enforce any leader election. However it still provides a zookeeper ephemeral znode based lock for leader election. It is unnecessary if applications use core library directly already have its own leader election mechanism.
This change is to provide a flag to allow disable the zk based lock. Author: Sijie Guo <[email protected]> Reviewers: Leigh Stewart <[email protected]> Closes #9 from sijie/sijie/flag_to_disable_lock Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/89613fb7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/89613fb7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/89613fb7 Branch: refs/heads/master Commit: 89613fb75ad282b0037166205ae68ea07ae76024 Parents: b23291a Author: Sijie Guo <[email protected]> Authored: Mon Aug 22 18:05:39 2016 -0700 Committer: Sijie Guo <[email protected]> Committed: Mon Aug 22 18:05:39 2016 -0700 ---------------------------------------------------------------------- .../distributedlog/BKDistributedLogManager.java | 25 +- .../distributedlog/BKLogReadHandler.java | 9 +- .../distributedlog/BKLogWriteHandler.java | 4 +- .../DistributedLogConfiguration.java | 26 + .../distributedlog/LocalDLMEmulator.java | 34 +- .../distributedlog/lock/DistributedLock.java | 514 +----------------- .../distributedlog/lock/NopDistributedLock.java | 34 ++ .../distributedlog/lock/SessionLock.java | 2 +- .../distributedlog/lock/ZKDistributedLock.java | 537 +++++++++++++++++++ .../com/twitter/distributedlog/DLMTestUtil.java | 1 + .../distributedlog/TestAsyncReaderWriter.java | 1 - .../TestBKDistributedLogManager.java | 24 + .../distributedlog/TestBKLogSegmentWriter.java | 56 +- .../distributedlog/TestDistributedLogBase.java | 12 +- .../lock/TestDistributedLock.java | 86 +-- .../service/DistributedLogCluster.java | 9 +- 16 files changed, 754 insertions(+), 620 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/89613fb7/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java index 9c19381..a5be03c 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java @@ -36,8 +36,10 @@ import com.twitter.distributedlog.impl.ZKLogSegmentMetadataStore; import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForReader; import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter; import com.twitter.distributedlog.io.AsyncCloseable; -import com.twitter.distributedlog.lock.SessionLockFactory; import com.twitter.distributedlog.lock.DistributedLock; +import com.twitter.distributedlog.lock.NopDistributedLock; +import com.twitter.distributedlog.lock.SessionLockFactory; +import com.twitter.distributedlog.lock.ZKDistributedLock; import com.twitter.distributedlog.lock.ZKSessionLockFactory; import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore; import com.twitter.distributedlog.metadata.BKDLConfig; @@ -100,9 +102,9 @@ import java.util.concurrent.TimeUnit; * scope `writer_future_pool`. See {@link MonitoredFuturePool} for detail stats. * <li> `reader_future_pool/*`: metrics about the future pools that used by readers are exposed under * scope `reader_future_pool`. See {@link MonitoredFuturePool} for detail stats. - * <li> `lock/*`: metrics about the locks used by writers. See {@link DistributedLock} for detail + * <li> `lock/*`: metrics about the locks used by writers. See {@link ZKDistributedLock} for detail * stats. - * <li> `read_lock/*`: metrics about the locks used by readers. See {@link DistributedLock} for + * <li> `read_lock/*`: metrics about the locks used by readers. See {@link ZKDistributedLock} for * detail stats. * <li> `logsegments/*`: metrics about basic operations on log segments. See {@link BKLogHandler} for details. * <li> `segments/*`: metrics about write operations on log segments. See {@link BKLogWriteHandler} for details. @@ -604,12 +606,17 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL final Promise<BKLogWriteHandler> createPromise) { OrderedScheduler lockStateExecutor = getLockStateExecutor(true); // Build the locks - DistributedLock lock = new DistributedLock( - lockStateExecutor, - getLockFactory(true), - logMetadata.getLockPath(), - conf.getLockTimeoutMilliSeconds(), - statsLogger); + DistributedLock lock; + if (conf.isWriteLockEnabled()) { + lock = new ZKDistributedLock( + lockStateExecutor, + getLockFactory(true), + logMetadata.getLockPath(), + conf.getLockTimeoutMilliSeconds(), + statsLogger); + } else { + lock = NopDistributedLock.INSTANCE; + } // Build the ledger allocator LedgerAllocator allocator; try { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/89613fb7/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java index 80f1270..0bf6b84 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java @@ -30,7 +30,9 @@ import com.twitter.distributedlog.exceptions.LockingException; import com.twitter.distributedlog.exceptions.LogNotFoundException; import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForReader; import com.twitter.distributedlog.injector.AsyncFailureInjector; +import com.twitter.distributedlog.lock.DistributedLock; import com.twitter.distributedlog.lock.SessionLockFactory; +import com.twitter.distributedlog.lock.ZKDistributedLock; import com.twitter.distributedlog.lock.ZKSessionLockFactory; import com.twitter.distributedlog.logsegment.LogSegmentFilter; import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore; @@ -39,7 +41,6 @@ import com.twitter.distributedlog.stats.BroadCastStatsLogger; import com.twitter.distributedlog.stats.ReadAheadExceptionsLogger; import com.twitter.distributedlog.util.FutureUtils; import com.twitter.distributedlog.util.OrderedScheduler; -import com.twitter.distributedlog.lock.DistributedLock; import com.twitter.distributedlog.util.Utils; import com.twitter.util.ExceptionalFunction; import com.twitter.util.ExceptionalFunction0; @@ -98,7 +99,7 @@ import scala.runtime.BoxedUnit; * becoming idle. * </ul> * <h4>Read Lock</h4> - * All read lock related stats are exposed under scope `read_lock`. See {@link DistributedLock} + * All read lock related stats are exposed under scope `read_lock`. See {@link ZKDistributedLock} * for detail stats. */ class BKLogReadHandler extends BKLogHandler { @@ -216,7 +217,7 @@ class BKLogReadHandler extends BKLogHandler { public DistributedLock applyE() throws IOException { // Unfortunately this has a blocking call which we should not execute on the // ZK completion thread - BKLogReadHandler.this.readLock = new DistributedLock( + BKLogReadHandler.this.readLock = new ZKDistributedLock( lockStateExecutor, lockFactory, readLockPath, @@ -247,7 +248,7 @@ class BKLogReadHandler extends BKLogHandler { * executor service thread. */ Future<Void> acquireLockOnExecutorThread(DistributedLock lock) throws LockingException { - final Future<DistributedLock> acquireFuture = lock.asyncAcquire(); + final Future<? extends DistributedLock> acquireFuture = lock.asyncAcquire(); // The future we return must be satisfied on an executor service thread. If we simply // return the future returned by asyncAcquire, user callbacks may end up running in http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/89613fb7/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java index 630d626..d73c5e2 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java @@ -110,7 +110,7 @@ class BKLogWriteHandler extends BKLogHandler { protected final int regionId; protected volatile boolean closed = false; protected final RollingPolicy rollingPolicy; - protected Future<DistributedLock> lockFuture = null; + protected Future<? extends DistributedLock> lockFuture = null; protected final PermitLimiter writeLimiter; protected final FeatureProvider featureProvider; protected final DynamicDistributedLogConfiguration dynConf; @@ -337,7 +337,7 @@ class BKLogWriteHandler extends BKLogHandler { * * @return future represents the lock result */ - Future<DistributedLock> lockHandler() { + Future<? extends DistributedLock> lockHandler() { if (null != lockFuture) { return lockFuture; } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/89613fb7/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java index 0d69f4a..d2af862 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java @@ -267,6 +267,8 @@ public class DistributedLogConfiguration extends CompositeConfiguration { public static final int BKDL_LOGSEGMENT_ROLLING_CONCURRENCY_DEFAULT = 1; // Lock Settings + public static final String BKDL_WRITE_LOCK_ENABLED = "writeLockEnabled"; + public static final boolean BKDL_WRITE_LOCK_ENABLED_DEFAULT = true; public static final String BKDL_LOCK_TIMEOUT = "lockTimeoutSeconds"; public static final long BKDL_LOCK_TIMEOUT_DEFAULT = 30; public static final String BKDL_LOCK_REACQUIRE_TIMEOUT = "lockReacquireTimeoutSeconds"; @@ -2039,6 +2041,30 @@ public class DistributedLogConfiguration extends CompositeConfiguration { // /** + * Is lock enabled when opening a writer to write a stream? + * <p> We don't generally require a lock to write a stream to guarantee correctness. The lock + * is more on tracking ownerships. The built-in fencing mechanism is used guarantee correctness + * during stream owner failover. It is okay to disable lock if your application knows which nodes + * have to write which streams. + * + * @return true if lock is enabled, otherwise false. + */ + public boolean isWriteLockEnabled() { + return this.getBoolean(BKDL_WRITE_LOCK_ENABLED, BKDL_WRITE_LOCK_ENABLED_DEFAULT); + } + + /** + * Enable lock for opening a writer to write a stream? + * + * @param enabled flag to enable or disable lock for opening a writer to write a stream. + * @return distributedlog configuration. + */ + public DistributedLogConfiguration setWriteLockEnabled(boolean enabled) { + setProperty(BKDL_WRITE_LOCK_ENABLED, enabled); + return this; + } + + /** * Get lock timeout in milliseconds. The default value is 30. * * @return lock timeout in milliseconds http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/89613fb7/distributedlog-core/src/main/java/com/twitter/distributedlog/LocalDLMEmulator.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/LocalDLMEmulator.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/LocalDLMEmulator.java index aaecdd5..85a370f 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/LocalDLMEmulator.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/LocalDLMEmulator.java @@ -34,10 +34,8 @@ import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedReader; import java.io.File; import java.io.IOException; -import java.io.InputStreamReader; import java.net.BindException; import java.net.URI; import java.util.ArrayList; @@ -118,9 +116,12 @@ public class LocalDLMEmulator { conf = (ServerConfiguration) DEFAULT_SERVER_CONFIGURATION.clone(); conf.setZkTimeout(zkTimeoutSec * 1000); } + ServerConfiguration newConf = new ServerConfiguration(); + newConf.loadConf(conf); + newConf.setAllowLoopback(true); return new LocalDLMEmulator(numBookies, shouldStartZK, zkHost, zkPort, - initialBookiePort, zkTimeoutSec, conf); + initialBookiePort, zkTimeoutSec, newConf); } } @@ -128,30 +129,6 @@ public class LocalDLMEmulator { return new Builder(); } - public LocalDLMEmulator(final int numBookies) throws Exception { - this(numBookies, true, DEFAULT_ZK_HOST, DEFAULT_ZK_PORT, DEFAULT_BOOKIE_INITIAL_PORT); - } - - public LocalDLMEmulator(final int numBookies, final String zkHost, final int zkPort) throws Exception { - this(numBookies, false, zkHost, zkPort, DEFAULT_BOOKIE_INITIAL_PORT); - } - - public LocalDLMEmulator(final int numBookies, final String zkHost, final int zkPort, final ServerConfiguration serverConf) throws Exception { - this(numBookies, false, zkHost, zkPort, DEFAULT_BOOKIE_INITIAL_PORT, DEFAULT_ZK_TIMEOUT_SEC, serverConf); - } - - public LocalDLMEmulator(final int numBookies, final int initialBookiePort) throws Exception { - this(numBookies, true, DEFAULT_ZK_HOST, DEFAULT_ZK_PORT, initialBookiePort); - } - - public LocalDLMEmulator(final int numBookies, final String zkHost, final int zkPort, final int initialBookiePort) throws Exception { - this(numBookies, false, zkHost, zkPort, initialBookiePort); - } - - private LocalDLMEmulator(final int numBookies, final boolean shouldStartZK, final String zkHost, final int zkPort, final int initialBookiePort) throws Exception { - this(numBookies, shouldStartZK, zkHost, zkPort, initialBookiePort, DEFAULT_ZK_TIMEOUT_SEC, new ServerConfiguration()); - } - private LocalDLMEmulator(final int numBookies, final boolean shouldStartZK, final String zkHost, final int zkPort, final int initialBookiePort, final int zkTimeoutSec, final ServerConfiguration serverConf) throws Exception { this.numBookies = numBookies; this.zkHost = zkHost; @@ -162,7 +139,9 @@ public class LocalDLMEmulator { this.bkStartupThread = new Thread() { public void run() { try { + LOG.info("Starting {} bookies : allowLoopback = {}", numBookies, serverConf.getAllowLoopback()); LocalBookKeeper.startLocalBookies(zkHost, zkPort, numBookies, shouldStartZK, initialBookiePort, serverConf); + LOG.info("{} bookies are started."); } catch (InterruptedException e) { // go away quietly } catch (Exception e) { @@ -205,6 +184,7 @@ public class LocalDLMEmulator { ServerConfiguration bookieConf = new ServerConfiguration(); bookieConf.setZkTimeout(zkTimeoutSec * 1000); bookieConf.setBookiePort(0); + bookieConf.setAllowLoopback(true); File tmpdir = File.createTempFile("bookie" + UUID.randomUUID() + "_", "test"); if (!tmpdir.delete()) { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/89613fb7/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/DistributedLock.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/DistributedLock.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/DistributedLock.java index ddff9c4..0369946 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/DistributedLock.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/DistributedLock.java @@ -1,519 +1,37 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package com.twitter.distributedlog.lock; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Stopwatch; import com.twitter.distributedlog.exceptions.LockingException; -import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException; -import com.twitter.distributedlog.exceptions.UnexpectedException; import com.twitter.distributedlog.io.AsyncCloseable; -import com.twitter.distributedlog.util.FutureUtils; -import com.twitter.distributedlog.util.FutureUtils.OrderedFutureEventListener; -import com.twitter.distributedlog.util.OrderedScheduler; -import com.twitter.util.Function; import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction0; -import scala.runtime.BoxedUnit; - -import java.io.IOException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; /** - * Distributed lock, using ZooKeeper. - * <p/> - * The lock is vulnerable to timing issues. For example, the process could - * encounter a really long GC cycle between acquiring the lock, and writing to - * a ledger. This could have timed out the lock, and another process could have - * acquired the lock and started writing to bookkeeper. Therefore other - * mechanisms are required to ensure correctness (i.e. Fencing). - * <p/> - * The lock is only allowed to acquire once. If the lock is acquired successfully, - * the caller holds the ownership until it loses the ownership either because of - * others already acquired the lock when session expired or explicitly close it. - * <p> - * The caller could use {@link #checkOwnership()} or {@link #checkOwnershipAndReacquire()} - * to check if it still holds the lock. If it doesn't hold the lock, the caller should - * give up the ownership and close the lock. - * <h3>Metrics</h3> - * All the lock related stats are exposed under `lock`. - * <ul> - * <li>lock/acquire: opstats. latency spent on acquiring a lock. - * <li>lock/reacquire: opstats. latency spent on re-acquiring a lock. - * <li>lock/internalTryRetries: counter. the number of retries on re-creating internal locks. - * </ul> - * Other internal lock related stats are also exposed under `lock`. See {@link SessionLock} - * for details. + * Interface for distributed locking */ -public class DistributedLock implements LockListener, AsyncCloseable { - - static final Logger LOG = LoggerFactory.getLogger(DistributedLock.class); - - private final SessionLockFactory lockFactory; - private final OrderedScheduler lockStateExecutor; - private final String lockPath; - private final long lockTimeout; - private final DistributedLockContext lockContext = new DistributedLockContext(); - - // We have two lock acquire futures: - // 1. lock acquire future: for the initial acquire op - // 2. lock reacquire future: for reacquire necessary when session expires, lock is closed - private Future<DistributedLock> lockAcquireFuture = null; - private Future<DistributedLock> lockReacquireFuture = null; - // following variable tracking the status of acquire process - // => create (internalLock) => tryLock (tryLockFuture) => waitForAcquire (lockWaiter) - private SessionLock internalLock = null; - private Future<LockWaiter> tryLockFuture = null; - private LockWaiter lockWaiter = null; - // exception indicating if the reacquire failed - private LockingException lockReacquireException = null; - // closeFuture - private volatile boolean closed = false; - private Future<Void> closeFuture = null; - - // A counter to track how many re-acquires happened during a lock's life cycle. - private final AtomicInteger reacquireCount = new AtomicInteger(0); - private final StatsLogger lockStatsLogger; - private final OpStatsLogger acquireStats; - private final OpStatsLogger reacquireStats; - private final Counter internalTryRetries; - - public DistributedLock( - OrderedScheduler lockStateExecutor, - SessionLockFactory lockFactory, - String lockPath, - long lockTimeout, - StatsLogger statsLogger) { - this.lockStateExecutor = lockStateExecutor; - this.lockPath = lockPath; - this.lockTimeout = lockTimeout; - this.lockFactory = lockFactory; - - lockStatsLogger = statsLogger.scope("lock"); - acquireStats = lockStatsLogger.getOpStatsLogger("acquire"); - reacquireStats = lockStatsLogger.getOpStatsLogger("reacquire"); - internalTryRetries = lockStatsLogger.getCounter("internalTryRetries"); - } - - private LockClosedException newLockClosedException() { - return new LockClosedException(lockPath, "Lock is already closed"); - } - - private synchronized void checkLockState() throws LockingException { - if (closed) { - throw newLockClosedException(); - } - if (null != lockReacquireException) { - throw lockReacquireException; - } - } - - /** - * Asynchronously acquire the lock. Technically the try phase of this operation--which adds us to the waiter - * list--is executed synchronously, but the lock wait itself doesn't block. - */ - public synchronized Future<DistributedLock> asyncAcquire() { - if (null != lockAcquireFuture) { - return Future.exception(new UnexpectedException("Someone is already acquiring/acquired lock " + lockPath)); - } - final Promise<DistributedLock> promise = - new Promise<DistributedLock>(new Function<Throwable, BoxedUnit>() { - @Override - public BoxedUnit apply(Throwable cause) { - lockStateExecutor.submit(lockPath, new Runnable() { - @Override - public void run() { - asyncClose(); - } - }); - return BoxedUnit.UNIT; - } - }); - final Stopwatch stopwatch = Stopwatch.createStarted(); - promise.addEventListener(new FutureEventListener<DistributedLock>() { - @Override - public void onSuccess(DistributedLock lock) { - acquireStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); - } - @Override - public void onFailure(Throwable cause) { - acquireStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); - // release the lock if fail to acquire - asyncClose(); - } - }); - this.lockAcquireFuture = promise; - lockStateExecutor.submit(lockPath, new Runnable() { - @Override - public void run() { - doAsyncAcquire(promise, lockTimeout); - } - }); - return promise; - } - - void doAsyncAcquire(final Promise<DistributedLock> acquirePromise, - final long lockTimeout) { - LOG.trace("Async Lock Acquire {}", lockPath); - try { - checkLockState(); - } catch (IOException ioe) { - FutureUtils.setException(acquirePromise, ioe); - return; - } - - lockFactory.createLock(lockPath, lockContext).addEventListener(OrderedFutureEventListener.of( - new FutureEventListener<SessionLock>() { - @Override - public void onSuccess(SessionLock lock) { - synchronized (DistributedLock.this) { - if (closed) { - LOG.info("Skipping tryLocking lock {} since it is already closed", lockPath); - FutureUtils.setException(acquirePromise, newLockClosedException()); - return; - } - } - synchronized (DistributedLock.this) { - internalLock = lock; - internalLock.setLockListener(DistributedLock.this); - } - asyncTryLock(lock, acquirePromise, lockTimeout); - } - - @Override - public void onFailure(Throwable cause) { - FutureUtils.setException(acquirePromise, cause); - } - }, lockStateExecutor, lockPath)); - } - - void asyncTryLock(SessionLock lock, - final Promise<DistributedLock> acquirePromise, - final long lockTimeout) { - if (null != tryLockFuture) { - tryLockFuture.cancel(); - } - tryLockFuture = lock.asyncTryLock(lockTimeout, TimeUnit.MILLISECONDS); - tryLockFuture.addEventListener(OrderedFutureEventListener.of( - new FutureEventListener<LockWaiter>() { - @Override - public void onSuccess(LockWaiter waiter) { - synchronized (DistributedLock.this) { - if (closed) { - LOG.info("Skipping acquiring lock {} since it is already closed", lockPath); - waiter.getAcquireFuture().raise(new LockingException(lockPath, "lock is already closed.")); - FutureUtils.setException(acquirePromise, newLockClosedException()); - return; - } - } - tryLockFuture = null; - lockWaiter = waiter; - waitForAcquire(waiter, acquirePromise); - } - - @Override - public void onFailure(Throwable cause) { - FutureUtils.setException(acquirePromise, cause); - } - }, lockStateExecutor, lockPath)); - } - - void waitForAcquire(final LockWaiter waiter, - final Promise<DistributedLock> acquirePromise) { - waiter.getAcquireFuture().addEventListener(OrderedFutureEventListener.of( - new FutureEventListener<Boolean>() { - @Override - public void onSuccess(Boolean acquired) { - LOG.info("{} acquired lock {}", waiter, lockPath); - if (acquired) { - FutureUtils.setValue(acquirePromise, DistributedLock.this); - } else { - FutureUtils.setException(acquirePromise, - new OwnershipAcquireFailedException(lockPath, waiter.getCurrentOwner())); - } - } - - @Override - public void onFailure(Throwable cause) { - FutureUtils.setException(acquirePromise, cause); - } - }, lockStateExecutor, lockPath)); - } +public interface DistributedLock extends AsyncCloseable { /** - * NOTE: The {@link LockListener#onExpired()} is already executed in lock executor. + * Asynchronously acquire the lock. + * + * @return future represents the acquire result. */ - @Override - public void onExpired() { - try { - reacquireLock(false); - } catch (LockingException le) { - // should not happen - LOG.error("Locking exception on re-acquiring lock {} : ", lockPath, le); - } - } + Future<? extends DistributedLock> asyncAcquire(); /** - * Check if hold lock, if it doesn't, then re-acquire the lock. + * Check if hold lock. If it doesn't, then re-acquire the lock. * - * @throws LockingException if the lock attempt fails + * @throws LockingException if the lock attempt fails + * @see #checkOwnership() */ - public synchronized void checkOwnershipAndReacquire() throws LockingException { - if (null == lockAcquireFuture || !lockAcquireFuture.isDefined()) { - throw new LockingException(lockPath, "check ownership before acquiring"); - } - - if (haveLock()) { - return; - } - - // We may have just lost the lock because of a ZK session timeout - // not necessarily because someone else acquired the lock. - // In such cases just try to reacquire. If that fails, it will throw - reacquireLock(true); - } + void checkOwnershipAndReacquire() throws LockingException; /** - * Check if lock is held. - * If not, error out and do not reacquire. Use this in cases where there are many waiters by default - * and reacquire is unlikley to succeed. + * Check if the lock is held. If not, error out and do not re-acquire. + * Use this in cases where there are many waiters by default and re-acquire + * is unlikely to succeed. * - * @throws LockingException if the lock attempt fails + * @throws LockingException if we lost the ownership + * @see #checkOwnershipAndReacquire() */ - public synchronized void checkOwnership() throws LockingException { - if (null == lockAcquireFuture || !lockAcquireFuture.isDefined()) { - throw new LockingException(lockPath, "check ownership before acquiring"); - } - if (!haveLock()) { - throw new LockingException(lockPath, "Lost lock ownership"); - } - } - - @VisibleForTesting - int getReacquireCount() { - return reacquireCount.get(); - } - - @VisibleForTesting - Future<DistributedLock> getLockReacquireFuture() { - return lockReacquireFuture; - } - - @VisibleForTesting - Future<DistributedLock> getLockAcquireFuture() { - return lockAcquireFuture; - } - - @VisibleForTesting - synchronized SessionLock getInternalLock() { - return internalLock; - } - - @VisibleForTesting - LockWaiter getLockWaiter() { - return lockWaiter; - } - - synchronized boolean haveLock() { - return !closed && internalLock != null && internalLock.isLockHeld(); - } - - void closeWaiter(final LockWaiter waiter, - final Promise<Void> closePromise) { - if (null == waiter) { - interruptTryLock(tryLockFuture, closePromise); - } else { - waiter.getAcquireFuture().addEventListener(OrderedFutureEventListener.of( - new FutureEventListener<Boolean>() { - @Override - public void onSuccess(Boolean value) { - unlockInternalLock(closePromise); - } - @Override - public void onFailure(Throwable cause) { - unlockInternalLock(closePromise); - } - }, lockStateExecutor, lockPath)); - FutureUtils.cancel(waiter.getAcquireFuture()); - } - } - - void interruptTryLock(final Future<LockWaiter> tryLockFuture, - final Promise<Void> closePromise) { - if (null == tryLockFuture) { - unlockInternalLock(closePromise); - } else { - tryLockFuture.addEventListener(OrderedFutureEventListener.of( - new FutureEventListener<LockWaiter>() { - @Override - public void onSuccess(LockWaiter waiter) { - closeWaiter(waiter, closePromise); - } - @Override - public void onFailure(Throwable cause) { - unlockInternalLock(closePromise); - } - }, lockStateExecutor, lockPath)); - FutureUtils.cancel(tryLockFuture); - } - } - - synchronized void unlockInternalLock(final Promise<Void> closePromise) { - if (internalLock == null) { - FutureUtils.setValue(closePromise, null); - } else { - internalLock.asyncUnlock().ensure(new AbstractFunction0<BoxedUnit>() { - @Override - public BoxedUnit apply() { - FutureUtils.setValue(closePromise, null); - return BoxedUnit.UNIT; - } - }); - } - } - - @Override - public Future<Void> asyncClose() { - final Promise<Void> closePromise; - synchronized (this) { - if (closed) { - return closeFuture; - } - closed = true; - closeFuture = closePromise = new Promise<Void>(); - } - final Promise<Void> closeWaiterFuture = new Promise<Void>(); - closeWaiterFuture.addEventListener(OrderedFutureEventListener.of(new FutureEventListener<Void>() { - @Override - public void onSuccess(Void value) { - complete(); - } - @Override - public void onFailure(Throwable cause) { - complete(); - } - - private void complete() { - FutureUtils.setValue(closePromise, null); - } - }, lockStateExecutor, lockPath)); - lockStateExecutor.submit(lockPath, new Runnable() { - @Override - public void run() { - closeWaiter(lockWaiter, closeWaiterFuture); - } - }); - return closePromise; - } - - void internalReacquireLock(final AtomicInteger numRetries, - final long lockTimeout, - final Promise<DistributedLock> reacquirePromise) { - lockStateExecutor.submit(lockPath, new Runnable() { - @Override - public void run() { - doInternalReacquireLock(numRetries, lockTimeout, reacquirePromise); - } - }); - } - - void doInternalReacquireLock(final AtomicInteger numRetries, - final long lockTimeout, - final Promise<DistributedLock> reacquirePromise) { - internalTryRetries.inc(); - Promise<DistributedLock> tryPromise = new Promise<DistributedLock>(); - tryPromise.addEventListener(new FutureEventListener<DistributedLock>() { - @Override - public void onSuccess(DistributedLock lock) { - FutureUtils.setValue(reacquirePromise, lock); - } - - @Override - public void onFailure(Throwable cause) { - if (cause instanceof OwnershipAcquireFailedException) { - // the lock has been acquired by others - FutureUtils.setException(reacquirePromise, cause); - } else { - if (numRetries.getAndDecrement() > 0 && !closed) { - internalReacquireLock(numRetries, lockTimeout, reacquirePromise); - } else { - FutureUtils.setException(reacquirePromise, cause); - } - } - } - }); - doAsyncAcquire(tryPromise, 0); - } - - private Future<DistributedLock> reacquireLock(boolean throwLockAcquireException) throws LockingException { - final Stopwatch stopwatch = Stopwatch.createStarted(); - Promise<DistributedLock> lockPromise; - synchronized (this) { - if (closed) { - throw newLockClosedException(); - } - if (null != lockReacquireException) { - if (throwLockAcquireException) { - throw lockReacquireException; - } else { - return null; - } - } - if (null != lockReacquireFuture) { - return lockReacquireFuture; - } - LOG.info("reacquiring lock at {}", lockPath); - lockReacquireFuture = lockPromise = new Promise<DistributedLock>(); - lockReacquireFuture.addEventListener(new FutureEventListener<DistributedLock>() { - @Override - public void onSuccess(DistributedLock lock) { - // if re-acquire successfully, clear the state. - synchronized (DistributedLock.this) { - lockReacquireFuture = null; - } - reacquireStats.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); - } - - @Override - public void onFailure(Throwable cause) { - synchronized (DistributedLock.this) { - if (cause instanceof LockingException) { - lockReacquireException = (LockingException) cause; - } else { - lockReacquireException = new LockingException(lockPath, - "Exception on re-acquiring lock", cause); - } - } - reacquireStats.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); - } - }); - } - reacquireCount.incrementAndGet(); - internalReacquireLock(new AtomicInteger(Integer.MAX_VALUE), 0, lockPromise); - return lockPromise; - } + void checkOwnership() throws LockingException; } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/89613fb7/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/NopDistributedLock.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/NopDistributedLock.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/NopDistributedLock.java new file mode 100644 index 0000000..75e32de --- /dev/null +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/NopDistributedLock.java @@ -0,0 +1,34 @@ +package com.twitter.distributedlog.lock; + +import com.twitter.distributedlog.exceptions.LockingException; +import com.twitter.util.Future; + +/** + * An implementation of {@link DistributedLock} which does nothing. + */ +public class NopDistributedLock implements DistributedLock { + + public static final DistributedLock INSTANCE = new NopDistributedLock(); + + private NopDistributedLock() {} + + @Override + public Future<? extends DistributedLock> asyncAcquire() { + return Future.value(this); + } + + @Override + public void checkOwnershipAndReacquire() throws LockingException { + // no-op + } + + @Override + public void checkOwnership() throws LockingException { + // no-op + } + + @Override + public Future<Void> asyncClose() { + return Future.Void(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/89613fb7/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/SessionLock.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/SessionLock.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/SessionLock.java index 7bfc6c1..95cd593 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/SessionLock.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/SessionLock.java @@ -70,7 +70,7 @@ public interface SessionLock { * <p> * <i>tryLock</i> here is effectively the combination of following asynchronous calls. * <pre> - * DistributedLock lock = ...; + * ZKDistributedLock lock = ...; * Future<LockWaiter> attemptFuture = lock.asyncTryLock(...); * * boolean acquired = waiter.waitForAcquireQuietly(); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/89613fb7/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKDistributedLock.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKDistributedLock.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKDistributedLock.java new file mode 100644 index 0000000..7e9f35b --- /dev/null +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKDistributedLock.java @@ -0,0 +1,537 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.lock; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Stopwatch; +import com.twitter.concurrent.AsyncSemaphore; +import com.twitter.distributedlog.exceptions.LockingException; +import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException; +import com.twitter.distributedlog.exceptions.UnexpectedException; +import com.twitter.distributedlog.util.FutureUtils; +import com.twitter.distributedlog.util.FutureUtils.OrderedFutureEventListener; +import com.twitter.distributedlog.util.OrderedScheduler; +import com.twitter.util.Function; +import com.twitter.util.Future; +import com.twitter.util.FutureEventListener; +import com.twitter.util.Promise; +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.runtime.AbstractFunction0; +import scala.runtime.BoxedUnit; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Distributed lock, using ZooKeeper. + * <p/> + * The lock is vulnerable to timing issues. For example, the process could + * encounter a really long GC cycle between acquiring the lock, and writing to + * a ledger. This could have timed out the lock, and another process could have + * acquired the lock and started writing to bookkeeper. Therefore other + * mechanisms are required to ensure correctness (i.e. Fencing). + * <p/> + * The lock is only allowed to acquire once. If the lock is acquired successfully, + * the caller holds the ownership until it loses the ownership either because of + * others already acquired the lock when session expired or explicitly close it. + * <p> + * The caller could use {@link #checkOwnership()} or {@link #checkOwnershipAndReacquire()} + * to check if it still holds the lock. If it doesn't hold the lock, the caller should + * give up the ownership and close the lock. + * <h3>Metrics</h3> + * All the lock related stats are exposed under `lock`. + * <ul> + * <li>lock/acquire: opstats. latency spent on acquiring a lock. + * <li>lock/reacquire: opstats. latency spent on re-acquiring a lock. + * <li>lock/internalTryRetries: counter. the number of retries on re-creating internal locks. + * </ul> + * Other internal lock related stats are also exposed under `lock`. See {@link SessionLock} + * for details. + */ +public class ZKDistributedLock implements LockListener, DistributedLock { + + static final Logger LOG = LoggerFactory.getLogger(ZKDistributedLock.class); + + private final SessionLockFactory lockFactory; + private final OrderedScheduler lockStateExecutor; + private final String lockPath; + private final long lockTimeout; + private final DistributedLockContext lockContext = new DistributedLockContext(); + + private final AsyncSemaphore lockSemaphore = new AsyncSemaphore(1); + // We have two lock acquire futures: + // 1. lock acquire future: for the initial acquire op + // 2. lock reacquire future: for reacquire necessary when session expires, lock is closed + private Future<ZKDistributedLock> lockAcquireFuture = null; + private Future<ZKDistributedLock> lockReacquireFuture = null; + // following variable tracking the status of acquire process + // => create (internalLock) => tryLock (tryLockFuture) => waitForAcquire (lockWaiter) + private SessionLock internalLock = null; + private Future<LockWaiter> tryLockFuture = null; + private LockWaiter lockWaiter = null; + // exception indicating if the reacquire failed + private LockingException lockReacquireException = null; + // closeFuture + private volatile boolean closed = false; + private Future<Void> closeFuture = null; + + // A counter to track how many re-acquires happened during a lock's life cycle. + private final AtomicInteger reacquireCount = new AtomicInteger(0); + private final StatsLogger lockStatsLogger; + private final OpStatsLogger acquireStats; + private final OpStatsLogger reacquireStats; + private final Counter internalTryRetries; + + public ZKDistributedLock( + OrderedScheduler lockStateExecutor, + SessionLockFactory lockFactory, + String lockPath, + long lockTimeout, + StatsLogger statsLogger) { + this.lockStateExecutor = lockStateExecutor; + this.lockPath = lockPath; + this.lockTimeout = lockTimeout; + this.lockFactory = lockFactory; + + lockStatsLogger = statsLogger.scope("lock"); + acquireStats = lockStatsLogger.getOpStatsLogger("acquire"); + reacquireStats = lockStatsLogger.getOpStatsLogger("reacquire"); + internalTryRetries = lockStatsLogger.getCounter("internalTryRetries"); + } + + private LockClosedException newLockClosedException() { + return new LockClosedException(lockPath, "Lock is already closed"); + } + + private synchronized void checkLockState() throws LockingException { + if (closed) { + throw newLockClosedException(); + } + if (null != lockReacquireException) { + throw lockReacquireException; + } + } + + /** + * Asynchronously acquire the lock. Technically the try phase of this operation--which adds us to the waiter + * list--is executed synchronously, but the lock wait itself doesn't block. + */ + public synchronized Future<ZKDistributedLock> asyncAcquire() { + if (null != lockAcquireFuture) { + return Future.exception(new UnexpectedException("Someone is already acquiring/acquired lock " + lockPath)); + } + final Promise<ZKDistributedLock> promise = + new Promise<ZKDistributedLock>(new Function<Throwable, BoxedUnit>() { + @Override + public BoxedUnit apply(Throwable cause) { + lockStateExecutor.submit(lockPath, new Runnable() { + @Override + public void run() { + asyncClose(); + } + }); + return BoxedUnit.UNIT; + } + }); + final Stopwatch stopwatch = Stopwatch.createStarted(); + promise.addEventListener(new FutureEventListener<ZKDistributedLock>() { + @Override + public void onSuccess(ZKDistributedLock lock) { + acquireStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); + } + @Override + public void onFailure(Throwable cause) { + acquireStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); + // release the lock if fail to acquire + asyncClose(); + } + }); + this.lockAcquireFuture = promise; + lockStateExecutor.submit(lockPath, new Runnable() { + @Override + public void run() { + doAsyncAcquireWithSemaphore(promise, lockTimeout); + } + }); + return promise; + } + + void doAsyncAcquireWithSemaphore(final Promise<ZKDistributedLock> acquirePromise, + final long lockTimeout) { + lockSemaphore.acquireAndRun(new AbstractFunction0<Future<ZKDistributedLock>>() { + @Override + public Future<ZKDistributedLock> apply() { + doAsyncAcquire(acquirePromise, lockTimeout); + return acquirePromise; + } + }); + } + + void doAsyncAcquire(final Promise<ZKDistributedLock> acquirePromise, + final long lockTimeout) { + LOG.trace("Async Lock Acquire {}", lockPath); + try { + checkLockState(); + } catch (IOException ioe) { + FutureUtils.setException(acquirePromise, ioe); + return; + } + + if (haveLock()) { + // it already hold the lock + FutureUtils.setValue(acquirePromise, this); + return; + } + + lockFactory.createLock(lockPath, lockContext).addEventListener(OrderedFutureEventListener.of( + new FutureEventListener<SessionLock>() { + @Override + public void onSuccess(SessionLock lock) { + synchronized (ZKDistributedLock.this) { + if (closed) { + LOG.info("Skipping tryLocking lock {} since it is already closed", lockPath); + FutureUtils.setException(acquirePromise, newLockClosedException()); + return; + } + } + synchronized (ZKDistributedLock.this) { + internalLock = lock; + internalLock.setLockListener(ZKDistributedLock.this); + } + asyncTryLock(lock, acquirePromise, lockTimeout); + } + + @Override + public void onFailure(Throwable cause) { + FutureUtils.setException(acquirePromise, cause); + } + }, lockStateExecutor, lockPath)); + } + + void asyncTryLock(SessionLock lock, + final Promise<ZKDistributedLock> acquirePromise, + final long lockTimeout) { + if (null != tryLockFuture) { + tryLockFuture.cancel(); + } + tryLockFuture = lock.asyncTryLock(lockTimeout, TimeUnit.MILLISECONDS); + tryLockFuture.addEventListener(OrderedFutureEventListener.of( + new FutureEventListener<LockWaiter>() { + @Override + public void onSuccess(LockWaiter waiter) { + synchronized (ZKDistributedLock.this) { + if (closed) { + LOG.info("Skipping acquiring lock {} since it is already closed", lockPath); + waiter.getAcquireFuture().raise(new LockingException(lockPath, "lock is already closed.")); + FutureUtils.setException(acquirePromise, newLockClosedException()); + return; + } + } + tryLockFuture = null; + lockWaiter = waiter; + waitForAcquire(waiter, acquirePromise); + } + + @Override + public void onFailure(Throwable cause) { + FutureUtils.setException(acquirePromise, cause); + } + }, lockStateExecutor, lockPath)); + } + + void waitForAcquire(final LockWaiter waiter, + final Promise<ZKDistributedLock> acquirePromise) { + waiter.getAcquireFuture().addEventListener(OrderedFutureEventListener.of( + new FutureEventListener<Boolean>() { + @Override + public void onSuccess(Boolean acquired) { + LOG.info("{} acquired lock {}", waiter, lockPath); + if (acquired) { + FutureUtils.setValue(acquirePromise, ZKDistributedLock.this); + } else { + FutureUtils.setException(acquirePromise, + new OwnershipAcquireFailedException(lockPath, waiter.getCurrentOwner())); + } + } + + @Override + public void onFailure(Throwable cause) { + FutureUtils.setException(acquirePromise, cause); + } + }, lockStateExecutor, lockPath)); + } + + /** + * NOTE: The {@link LockListener#onExpired()} is already executed in lock executor. + */ + @Override + public void onExpired() { + try { + reacquireLock(false); + } catch (LockingException le) { + // should not happen + LOG.error("Locking exception on re-acquiring lock {} : ", lockPath, le); + } + } + + /** + * Check if hold lock, if it doesn't, then re-acquire the lock. + * + * @throws LockingException if the lock attempt fails + */ + public synchronized void checkOwnershipAndReacquire() throws LockingException { + if (null == lockAcquireFuture || !lockAcquireFuture.isDefined()) { + throw new LockingException(lockPath, "check ownership before acquiring"); + } + + if (haveLock()) { + return; + } + + // We may have just lost the lock because of a ZK session timeout + // not necessarily because someone else acquired the lock. + // In such cases just try to reacquire. If that fails, it will throw + reacquireLock(true); + } + + /** + * Check if lock is held. + * If not, error out and do not reacquire. Use this in cases where there are many waiters by default + * and reacquire is unlikley to succeed. + * + * @throws LockingException if the lock attempt fails + */ + public synchronized void checkOwnership() throws LockingException { + if (null == lockAcquireFuture || !lockAcquireFuture.isDefined()) { + throw new LockingException(lockPath, "check ownership before acquiring"); + } + if (!haveLock()) { + throw new LockingException(lockPath, "Lost lock ownership"); + } + } + + @VisibleForTesting + int getReacquireCount() { + return reacquireCount.get(); + } + + @VisibleForTesting + Future<ZKDistributedLock> getLockReacquireFuture() { + return lockReacquireFuture; + } + + @VisibleForTesting + Future<ZKDistributedLock> getLockAcquireFuture() { + return lockAcquireFuture; + } + + @VisibleForTesting + synchronized SessionLock getInternalLock() { + return internalLock; + } + + @VisibleForTesting + LockWaiter getLockWaiter() { + return lockWaiter; + } + + synchronized boolean haveLock() { + return !closed && internalLock != null && internalLock.isLockHeld(); + } + + void closeWaiter(final LockWaiter waiter, + final Promise<Void> closePromise) { + if (null == waiter) { + interruptTryLock(tryLockFuture, closePromise); + } else { + waiter.getAcquireFuture().addEventListener(OrderedFutureEventListener.of( + new FutureEventListener<Boolean>() { + @Override + public void onSuccess(Boolean value) { + unlockInternalLock(closePromise); + } + @Override + public void onFailure(Throwable cause) { + unlockInternalLock(closePromise); + } + }, lockStateExecutor, lockPath)); + FutureUtils.cancel(waiter.getAcquireFuture()); + } + } + + void interruptTryLock(final Future<LockWaiter> tryLockFuture, + final Promise<Void> closePromise) { + if (null == tryLockFuture) { + unlockInternalLock(closePromise); + } else { + tryLockFuture.addEventListener(OrderedFutureEventListener.of( + new FutureEventListener<LockWaiter>() { + @Override + public void onSuccess(LockWaiter waiter) { + closeWaiter(waiter, closePromise); + } + @Override + public void onFailure(Throwable cause) { + unlockInternalLock(closePromise); + } + }, lockStateExecutor, lockPath)); + FutureUtils.cancel(tryLockFuture); + } + } + + synchronized void unlockInternalLock(final Promise<Void> closePromise) { + if (internalLock == null) { + FutureUtils.setValue(closePromise, null); + } else { + internalLock.asyncUnlock().ensure(new AbstractFunction0<BoxedUnit>() { + @Override + public BoxedUnit apply() { + FutureUtils.setValue(closePromise, null); + return BoxedUnit.UNIT; + } + }); + } + } + + @Override + public Future<Void> asyncClose() { + final Promise<Void> closePromise; + synchronized (this) { + if (closed) { + return closeFuture; + } + closed = true; + closeFuture = closePromise = new Promise<Void>(); + } + final Promise<Void> closeWaiterFuture = new Promise<Void>(); + closeWaiterFuture.addEventListener(OrderedFutureEventListener.of(new FutureEventListener<Void>() { + @Override + public void onSuccess(Void value) { + complete(); + } + @Override + public void onFailure(Throwable cause) { + complete(); + } + + private void complete() { + FutureUtils.setValue(closePromise, null); + } + }, lockStateExecutor, lockPath)); + lockStateExecutor.submit(lockPath, new Runnable() { + @Override + public void run() { + closeWaiter(lockWaiter, closeWaiterFuture); + } + }); + return closePromise; + } + + void internalReacquireLock(final AtomicInteger numRetries, + final long lockTimeout, + final Promise<ZKDistributedLock> reacquirePromise) { + lockStateExecutor.submit(lockPath, new Runnable() { + @Override + public void run() { + doInternalReacquireLock(numRetries, lockTimeout, reacquirePromise); + } + }); + } + + void doInternalReacquireLock(final AtomicInteger numRetries, + final long lockTimeout, + final Promise<ZKDistributedLock> reacquirePromise) { + internalTryRetries.inc(); + Promise<ZKDistributedLock> tryPromise = new Promise<ZKDistributedLock>(); + tryPromise.addEventListener(new FutureEventListener<ZKDistributedLock>() { + @Override + public void onSuccess(ZKDistributedLock lock) { + FutureUtils.setValue(reacquirePromise, lock); + } + + @Override + public void onFailure(Throwable cause) { + if (cause instanceof OwnershipAcquireFailedException) { + // the lock has been acquired by others + FutureUtils.setException(reacquirePromise, cause); + } else { + if (numRetries.getAndDecrement() > 0 && !closed) { + internalReacquireLock(numRetries, lockTimeout, reacquirePromise); + } else { + FutureUtils.setException(reacquirePromise, cause); + } + } + } + }); + doAsyncAcquireWithSemaphore(tryPromise, 0); + } + + private Future<ZKDistributedLock> reacquireLock(boolean throwLockAcquireException) throws LockingException { + final Stopwatch stopwatch = Stopwatch.createStarted(); + Promise<ZKDistributedLock> lockPromise; + synchronized (this) { + if (closed) { + throw newLockClosedException(); + } + if (null != lockReacquireException) { + if (throwLockAcquireException) { + throw lockReacquireException; + } else { + return null; + } + } + if (null != lockReacquireFuture) { + return lockReacquireFuture; + } + LOG.info("reacquiring lock at {}", lockPath); + lockReacquireFuture = lockPromise = new Promise<ZKDistributedLock>(); + lockReacquireFuture.addEventListener(new FutureEventListener<ZKDistributedLock>() { + @Override + public void onSuccess(ZKDistributedLock lock) { + // if re-acquire successfully, clear the state. + synchronized (ZKDistributedLock.this) { + lockReacquireFuture = null; + } + reacquireStats.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); + } + + @Override + public void onFailure(Throwable cause) { + synchronized (ZKDistributedLock.this) { + if (cause instanceof LockingException) { + lockReacquireException = (LockingException) cause; + } else { + lockReacquireException = new LockingException(lockPath, + "Exception on re-acquiring lock", cause); + } + } + reacquireStats.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); + } + }); + } + reacquireCount.incrementAndGet(); + internalReacquireLock(new AtomicInteger(Integer.MAX_VALUE), 0, lockPromise); + return lockPromise; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/89613fb7/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java index 2c13b06..3355c9b 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java @@ -536,6 +536,7 @@ public class DLMTestUtil { } catch (org.apache.commons.configuration.ConfigurationException ex) { LOG.warn("loading conf failed", ex); } + conf.setAllowLoopback(true); return conf; } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/89613fb7/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java index 06cf079..787f74f 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java @@ -51,7 +51,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Stopwatch; -import com.twitter.distributedlog.annotations.DistributedLogAnnotations; import com.twitter.distributedlog.exceptions.DLIllegalStateException; import com.twitter.distributedlog.exceptions.EndOfStreamException; import com.twitter.distributedlog.exceptions.IdleReaderException; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/89613fb7/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java index 0c8ca9a..3fa3e7d 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import com.twitter.distributedlog.exceptions.AlreadyTruncatedTransactionException; +import com.twitter.distributedlog.exceptions.BKTransmitException; import com.twitter.distributedlog.exceptions.LogEmptyException; import com.twitter.distributedlog.exceptions.LogNotFoundException; import com.twitter.distributedlog.exceptions.LogReadException; @@ -35,6 +36,7 @@ import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore; import com.twitter.distributedlog.util.FutureUtils; import com.twitter.distributedlog.util.OrderedScheduler; import com.twitter.distributedlog.util.Utils; +import org.apache.bookkeeper.client.BKException; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -322,6 +324,28 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { } @Test(timeout = 60000) + public void testTwoWritersOnLockDisabled() throws Exception { + DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); + confLocal.addConfiguration(conf); + confLocal.setOutputBufferSize(0); + confLocal.setWriteLockEnabled(false); + String name = "distrlog-two-writers-lock-disabled"; + DistributedLogManager manager = createNewDLM(confLocal, name); + AsyncLogWriter writer1 = FutureUtils.result(manager.openAsyncLogWriter()); + FutureUtils.result(writer1.write(DLMTestUtil.getLogRecordInstance(1L))); + AsyncLogWriter writer2 = FutureUtils.result(manager.openAsyncLogWriter()); + FutureUtils.result(writer2.write(DLMTestUtil.getLogRecordInstance(2L))); + + // write a record to writer 1 again + try { + FutureUtils.result(writer1.write(DLMTestUtil.getLogRecordInstance(3L))); + fail("Should fail writing record to writer 1 again as writer 2 took over the ownership"); + } catch (BKTransmitException bkte) { + assertEquals(BKException.Code.LedgerFencedException, bkte.getBKResultCode()); + } + } + + @Test(timeout = 60000) public void testSimpleRead() throws Exception { String name = "distrlog-simpleread"; DistributedLogManager dlm = createNewDLM(conf, name); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/89613fb7/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogSegmentWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogSegmentWriter.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogSegmentWriter.java index 0baf9fe..43e55e4 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogSegmentWriter.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogSegmentWriter.java @@ -24,7 +24,7 @@ import com.twitter.distributedlog.exceptions.WriteException; import com.twitter.distributedlog.impl.BKLogSegmentEntryWriter; import com.twitter.distributedlog.io.Abortables; import com.twitter.distributedlog.lock.SessionLockFactory; -import com.twitter.distributedlog.lock.DistributedLock; +import com.twitter.distributedlog.lock.ZKDistributedLock; import com.twitter.distributedlog.lock.ZKSessionLockFactory; import com.twitter.distributedlog.metadata.BKDLConfig; import com.twitter.distributedlog.util.ConfUtils; @@ -128,9 +128,9 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { return confLocal; } - private DistributedLock createLock(String path, - ZooKeeperClient zkClient, - boolean acquireLock) + private ZKDistributedLock createLock(String path, + ZooKeeperClient zkClient, + boolean acquireLock) throws Exception { try { Await.result(Utils.zkAsyncCreateFullPathOptimistic(zkClient, path, new byte[0], @@ -147,7 +147,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { conf.getZKSessionTimeoutMilliseconds(), NullStatsLogger.INSTANCE ); - DistributedLock lock = new DistributedLock( + ZKDistributedLock lock = new ZKDistributedLock( lockStateExecutor, lockFactory, path, @@ -161,7 +161,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { } private void closeWriterAndLock(BKLogSegmentWriter writer, - DistributedLock lock) + ZKDistributedLock lock) throws IOException { try { FutureUtils.result(writer.asyncClose()); @@ -171,7 +171,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { } private void abortWriterAndLock(BKLogSegmentWriter writer, - DistributedLock lock) + ZKDistributedLock lock) throws IOException { try { Abortables.abort(writer, false); @@ -183,7 +183,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { private BKLogSegmentWriter createLogSegmentWriter(DistributedLogConfiguration conf, long logSegmentSequenceNumber, long startTxId, - DistributedLock lock) throws Exception { + ZKDistributedLock lock) throws Exception { LedgerHandle lh = bkc.get().createLedger(3, 2, 2, BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(UTF_8)); return new BKLogSegmentWriter( @@ -230,12 +230,12 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { confLocal.setImmediateFlushEnabled(false); confLocal.setOutputBufferSize(Integer.MAX_VALUE); confLocal.setPeriodicFlushFrequencyMilliSeconds(0); - DistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true); + ZKDistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true); BKLogSegmentWriter writer = createLogSegmentWriter(confLocal, 0L, -1L, lock); // Use another lock to wait for writer releasing lock - DistributedLock lock0 = createLock("/test/lock-" + runtime.getMethodName(), zkc0, false); - Future<DistributedLock> lockFuture0 = lock0.asyncAcquire(); + ZKDistributedLock lock0 = createLock("/test/lock-" + runtime.getMethodName(), zkc0, false); + Future<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire(); // add 10 records int numRecords = 10; List<Future<DLSN>> futureList = new ArrayList<Future<DLSN>>(numRecords); @@ -292,12 +292,12 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { confLocal.setImmediateFlushEnabled(false); confLocal.setOutputBufferSize(Integer.MAX_VALUE); confLocal.setPeriodicFlushFrequencyMilliSeconds(0); - DistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true); + ZKDistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true); BKLogSegmentWriter writer = createLogSegmentWriter(confLocal, 0L, -1L, lock); // Use another lock to wait for writer releasing lock - DistributedLock lock0 = createLock("/test/lock-" + runtime.getMethodName(), zkc0, false); - Future<DistributedLock> lockFuture0 = lock0.asyncAcquire(); + ZKDistributedLock lock0 = createLock("/test/lock-" + runtime.getMethodName(), zkc0, false); + Future<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire(); // add 10 records int numRecords = 10; List<Future<DLSN>> futureList = new ArrayList<Future<DLSN>>(numRecords); @@ -368,12 +368,12 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { confLocal.setImmediateFlushEnabled(false); confLocal.setOutputBufferSize(Integer.MAX_VALUE); confLocal.setPeriodicFlushFrequencyMilliSeconds(0); - DistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true); + ZKDistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true); BKLogSegmentWriter writer = createLogSegmentWriter(confLocal, 0L, -1L, lock); // Use another lock to wait for writer releasing lock - DistributedLock lock0 = createLock("/test/lock-" + runtime.getMethodName(), zkc0, false); - Future<DistributedLock> lockFuture0 = lock0.asyncAcquire(); + ZKDistributedLock lock0 = createLock("/test/lock-" + runtime.getMethodName(), zkc0, false); + Future<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire(); // add 10 records int numRecords = 10; List<Future<DLSN>> futureList = new ArrayList<Future<DLSN>>(numRecords); @@ -440,12 +440,12 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { confLocal.setImmediateFlushEnabled(false); confLocal.setOutputBufferSize(Integer.MAX_VALUE); confLocal.setPeriodicFlushFrequencyMilliSeconds(0); - DistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true); + ZKDistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true); BKLogSegmentWriter writer = createLogSegmentWriter(confLocal, 0L, -1L, lock); // Use another lock to wait for writer releasing lock - DistributedLock lock0 = createLock("/test/lock-" + runtime.getMethodName(), zkc0, false); - Future<DistributedLock> lockFuture0 = lock0.asyncAcquire(); + ZKDistributedLock lock0 = createLock("/test/lock-" + runtime.getMethodName(), zkc0, false); + Future<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire(); // add 10 records int numRecords = 10; List<Future<DLSN>> futureList = new ArrayList<Future<DLSN>>(numRecords); @@ -512,12 +512,12 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { confLocal.setImmediateFlushEnabled(false); confLocal.setOutputBufferSize(Integer.MAX_VALUE); confLocal.setPeriodicFlushFrequencyMilliSeconds(0); - DistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true); + ZKDistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true); BKLogSegmentWriter writer = createLogSegmentWriter(confLocal, 0L, -1L, lock); // Use another lock to wait for writer releasing lock - DistributedLock lock0 = createLock("/test/lock-" + runtime.getMethodName(), zkc0, false); - Future<DistributedLock> lockFuture0 = lock0.asyncAcquire(); + ZKDistributedLock lock0 = createLock("/test/lock-" + runtime.getMethodName(), zkc0, false); + Future<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire(); // add 10 records int numRecords = 10; List<Future<DLSN>> futureList = new ArrayList<Future<DLSN>>(numRecords); @@ -621,7 +621,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { confLocal.setImmediateFlushEnabled(false); confLocal.setOutputBufferSize(Integer.MAX_VALUE); confLocal.setPeriodicFlushFrequencyMilliSeconds(0); - DistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true); + ZKDistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true); BKLogSegmentWriter writer = createLogSegmentWriter(confLocal, 0L, -1L, lock); // add 10 records @@ -685,7 +685,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { confLocal.setOutputBufferSize(Integer.MAX_VALUE); confLocal.setPeriodicFlushFrequencyMilliSeconds(0); confLocal.setDurableWriteEnabled(false); - DistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true); + ZKDistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true); BKLogSegmentWriter writer = createLogSegmentWriter(confLocal, 0L, -1L, lock); @@ -713,7 +713,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { confLocal.setOutputBufferSize(Integer.MAX_VALUE); confLocal.setPeriodicFlushFrequencyMilliSeconds(0); confLocal.setDurableWriteEnabled(false); - DistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true); + ZKDistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true); BKLogSegmentWriter writer = createLogSegmentWriter(confLocal, 0L, -1L, lock); @@ -741,7 +741,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { confLocal.setOutputBufferSize(Integer.MAX_VALUE); confLocal.setPeriodicFlushFrequencyMilliSeconds(0); confLocal.setDurableWriteEnabled(false); - DistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true); + ZKDistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true); BKLogSegmentWriter writer = createLogSegmentWriter(confLocal, 0L, -1L, lock); @@ -780,7 +780,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { confLocal.setOutputBufferSize(Integer.MAX_VALUE); confLocal.setPeriodicFlushFrequencyMilliSeconds(0); confLocal.setDurableWriteEnabled(false); - DistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true); + ZKDistributedLock lock = createLock("/test/lock-" + runtime.getMethodName(), zkc, true); BKLogSegmentWriter writer = createLogSegmentWriter(confLocal, 0L, -1L, lock); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/89613fb7/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java index d348492..74bafb3 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java @@ -37,7 +37,6 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,17 +71,20 @@ public class TestDistributedLogBase { @BeforeClass public static void setupCluster() throws Exception { - boolean success = false; - int retries = 0; File zkTmpDir = IOUtils.createTempDir("zookeeper", "distrlog"); tmpDirs.add(zkTmpDir); Pair<ZooKeeperServerShim, Integer> serverAndPort = LocalDLMEmulator.runZookeeperOnAnyPort(zkTmpDir); zks = serverAndPort.getLeft(); zkPort = serverAndPort.getRight(); - bkutil = new LocalDLMEmulator(numBookies, "127.0.0.1", zkPort, DLMTestUtil.loadTestBkConf()); + bkutil = LocalDLMEmulator.newBuilder() + .numBookies(numBookies) + .zkHost("127.0.0.1") + .zkPort(zkPort) + .serverConf(DLMTestUtil.loadTestBkConf()) + .shouldStartZK(false) + .build(); bkutil.start(); zkServers = "127.0.0.1:" + zkPort; - success = true; } @AfterClass
