http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKDistributedLock.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKDistributedLock.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKDistributedLock.java deleted file mode 100644 index 4d0de7f..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKDistributedLock.java +++ /dev/null @@ -1,537 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.lock; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Stopwatch; -import com.twitter.concurrent.AsyncSemaphore; -import com.twitter.distributedlog.exceptions.LockingException; -import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException; -import com.twitter.distributedlog.exceptions.UnexpectedException; -import com.twitter.distributedlog.util.FutureUtils; -import com.twitter.distributedlog.util.FutureUtils.OrderedFutureEventListener; -import com.twitter.distributedlog.util.OrderedScheduler; -import com.twitter.util.Function; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction0; -import scala.runtime.BoxedUnit; - -import java.io.IOException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Distributed lock, using ZooKeeper. - * <p/> - * The lock is vulnerable to timing issues. For example, the process could - * encounter a really long GC cycle between acquiring the lock, and writing to - * a ledger. This could have timed out the lock, and another process could have - * acquired the lock and started writing to bookkeeper. Therefore other - * mechanisms are required to ensure correctness (i.e. Fencing). - * <p/> - * The lock is only allowed to acquire once. If the lock is acquired successfully, - * the caller holds the ownership until it loses the ownership either because of - * others already acquired the lock when session expired or explicitly close it. - * <p> - * The caller could use {@link #checkOwnership()} or {@link #checkOwnershipAndReacquire()} - * to check if it still holds the lock. If it doesn't hold the lock, the caller should - * give up the ownership and close the lock. - * <h3>Metrics</h3> - * All the lock related stats are exposed under `lock`. - * <ul> - * <li>lock/acquire: opstats. latency spent on acquiring a lock. - * <li>lock/reacquire: opstats. latency spent on re-acquiring a lock. - * <li>lock/internalTryRetries: counter. the number of retries on re-creating internal locks. - * </ul> - * Other internal lock related stats are also exposed under `lock`. See {@link SessionLock} - * for details. - */ -public class ZKDistributedLock implements LockListener, DistributedLock { - - static final Logger LOG = LoggerFactory.getLogger(ZKDistributedLock.class); - - private final SessionLockFactory lockFactory; - private final OrderedScheduler lockStateExecutor; - private final String lockPath; - private final long lockTimeout; - private final DistributedLockContext lockContext = new DistributedLockContext(); - - private final AsyncSemaphore lockSemaphore = new AsyncSemaphore(1); - // We have two lock acquire futures: - // 1. lock acquire future: for the initial acquire op - // 2. lock reacquire future: for reacquire necessary when session expires, lock is closed - private Future<ZKDistributedLock> lockAcquireFuture = null; - private Future<ZKDistributedLock> lockReacquireFuture = null; - // following variable tracking the status of acquire process - // => create (internalLock) => tryLock (tryLockFuture) => waitForAcquire (lockWaiter) - private SessionLock internalLock = null; - private Future<LockWaiter> tryLockFuture = null; - private LockWaiter lockWaiter = null; - // exception indicating if the reacquire failed - private LockingException lockReacquireException = null; - // closeFuture - private volatile boolean closed = false; - private Future<Void> closeFuture = null; - - // A counter to track how many re-acquires happened during a lock's life cycle. - private final AtomicInteger reacquireCount = new AtomicInteger(0); - private final StatsLogger lockStatsLogger; - private final OpStatsLogger acquireStats; - private final OpStatsLogger reacquireStats; - private final Counter internalTryRetries; - - public ZKDistributedLock( - OrderedScheduler lockStateExecutor, - SessionLockFactory lockFactory, - String lockPath, - long lockTimeout, - StatsLogger statsLogger) { - this.lockStateExecutor = lockStateExecutor; - this.lockPath = lockPath; - this.lockTimeout = lockTimeout; - this.lockFactory = lockFactory; - - lockStatsLogger = statsLogger.scope("lock"); - acquireStats = lockStatsLogger.getOpStatsLogger("acquire"); - reacquireStats = lockStatsLogger.getOpStatsLogger("reacquire"); - internalTryRetries = lockStatsLogger.getCounter("internalTryRetries"); - } - - private LockClosedException newLockClosedException() { - return new LockClosedException(lockPath, "Lock is already closed"); - } - - private synchronized void checkLockState() throws LockingException { - if (closed) { - throw newLockClosedException(); - } - if (null != lockReacquireException) { - throw lockReacquireException; - } - } - - /** - * Asynchronously acquire the lock. Technically the try phase of this operation--which adds us to the waiter - * list--is executed synchronously, but the lock wait itself doesn't block. - */ - public synchronized Future<ZKDistributedLock> asyncAcquire() { - if (null != lockAcquireFuture) { - return Future.exception(new UnexpectedException("Someone is already acquiring/acquired lock " + lockPath)); - } - final Promise<ZKDistributedLock> promise = - new Promise<ZKDistributedLock>(new Function<Throwable, BoxedUnit>() { - @Override - public BoxedUnit apply(Throwable cause) { - lockStateExecutor.submit(lockPath, new Runnable() { - @Override - public void run() { - asyncClose(); - } - }); - return BoxedUnit.UNIT; - } - }); - final Stopwatch stopwatch = Stopwatch.createStarted(); - promise.addEventListener(new FutureEventListener<ZKDistributedLock>() { - @Override - public void onSuccess(ZKDistributedLock lock) { - acquireStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); - } - @Override - public void onFailure(Throwable cause) { - acquireStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); - // release the lock if fail to acquire - asyncClose(); - } - }); - this.lockAcquireFuture = promise; - lockStateExecutor.submit(lockPath, new Runnable() { - @Override - public void run() { - doAsyncAcquireWithSemaphore(promise, lockTimeout); - } - }); - return promise; - } - - void doAsyncAcquireWithSemaphore(final Promise<ZKDistributedLock> acquirePromise, - final long lockTimeout) { - lockSemaphore.acquireAndRun(new AbstractFunction0<Future<ZKDistributedLock>>() { - @Override - public Future<ZKDistributedLock> apply() { - doAsyncAcquire(acquirePromise, lockTimeout); - return acquirePromise; - } - }); - } - - void doAsyncAcquire(final Promise<ZKDistributedLock> acquirePromise, - final long lockTimeout) { - LOG.trace("Async Lock Acquire {}", lockPath); - try { - checkLockState(); - } catch (IOException ioe) { - FutureUtils.setException(acquirePromise, ioe); - return; - } - - if (haveLock()) { - // it already hold the lock - FutureUtils.setValue(acquirePromise, this); - return; - } - - lockFactory.createLock(lockPath, lockContext).addEventListener(OrderedFutureEventListener.of( - new FutureEventListener<SessionLock>() { - @Override - public void onSuccess(SessionLock lock) { - synchronized (ZKDistributedLock.this) { - if (closed) { - LOG.info("Skipping tryLocking lock {} since it is already closed", lockPath); - FutureUtils.setException(acquirePromise, newLockClosedException()); - return; - } - } - synchronized (ZKDistributedLock.this) { - internalLock = lock; - internalLock.setLockListener(ZKDistributedLock.this); - } - asyncTryLock(lock, acquirePromise, lockTimeout); - } - - @Override - public void onFailure(Throwable cause) { - FutureUtils.setException(acquirePromise, cause); - } - }, lockStateExecutor, lockPath)); - } - - void asyncTryLock(SessionLock lock, - final Promise<ZKDistributedLock> acquirePromise, - final long lockTimeout) { - if (null != tryLockFuture) { - tryLockFuture.cancel(); - } - tryLockFuture = lock.asyncTryLock(lockTimeout, TimeUnit.MILLISECONDS); - tryLockFuture.addEventListener(OrderedFutureEventListener.of( - new FutureEventListener<LockWaiter>() { - @Override - public void onSuccess(LockWaiter waiter) { - synchronized (ZKDistributedLock.this) { - if (closed) { - LOG.info("Skipping acquiring lock {} since it is already closed", lockPath); - waiter.getAcquireFuture().raise(new LockingException(lockPath, "lock is already closed.")); - FutureUtils.setException(acquirePromise, newLockClosedException()); - return; - } - } - tryLockFuture = null; - lockWaiter = waiter; - waitForAcquire(waiter, acquirePromise); - } - - @Override - public void onFailure(Throwable cause) { - FutureUtils.setException(acquirePromise, cause); - } - }, lockStateExecutor, lockPath)); - } - - void waitForAcquire(final LockWaiter waiter, - final Promise<ZKDistributedLock> acquirePromise) { - waiter.getAcquireFuture().addEventListener(OrderedFutureEventListener.of( - new FutureEventListener<Boolean>() { - @Override - public void onSuccess(Boolean acquired) { - LOG.info("{} acquired lock {}", waiter, lockPath); - if (acquired) { - FutureUtils.setValue(acquirePromise, ZKDistributedLock.this); - } else { - FutureUtils.setException(acquirePromise, - new OwnershipAcquireFailedException(lockPath, waiter.getCurrentOwner())); - } - } - - @Override - public void onFailure(Throwable cause) { - FutureUtils.setException(acquirePromise, cause); - } - }, lockStateExecutor, lockPath)); - } - - /** - * NOTE: The {@link LockListener#onExpired()} is already executed in lock executor. - */ - @Override - public void onExpired() { - try { - reacquireLock(false); - } catch (LockingException le) { - // should not happen - LOG.error("Locking exception on re-acquiring lock {} : ", lockPath, le); - } - } - - /** - * Check if hold lock, if it doesn't, then re-acquire the lock. - * - * @throws LockingException if the lock attempt fails - */ - public synchronized void checkOwnershipAndReacquire() throws LockingException { - if (null == lockAcquireFuture || !lockAcquireFuture.isDefined()) { - throw new LockingException(lockPath, "check ownership before acquiring"); - } - - if (haveLock()) { - return; - } - - // We may have just lost the lock because of a ZK session timeout - // not necessarily because someone else acquired the lock. - // In such cases just try to reacquire. If that fails, it will throw - reacquireLock(true); - } - - /** - * Check if lock is held. - * If not, error out and do not reacquire. Use this in cases where there are many waiters by default - * and reacquire is unlikley to succeed. - * - * @throws LockingException if the lock attempt fails - */ - public synchronized void checkOwnership() throws LockingException { - if (null == lockAcquireFuture || !lockAcquireFuture.isDefined()) { - throw new LockingException(lockPath, "check ownership before acquiring"); - } - if (!haveLock()) { - throw new LockingException(lockPath, "Lost lock ownership"); - } - } - - @VisibleForTesting - int getReacquireCount() { - return reacquireCount.get(); - } - - @VisibleForTesting - synchronized Future<ZKDistributedLock> getLockReacquireFuture() { - return lockReacquireFuture; - } - - @VisibleForTesting - synchronized Future<ZKDistributedLock> getLockAcquireFuture() { - return lockAcquireFuture; - } - - @VisibleForTesting - synchronized SessionLock getInternalLock() { - return internalLock; - } - - @VisibleForTesting - LockWaiter getLockWaiter() { - return lockWaiter; - } - - synchronized boolean haveLock() { - return !closed && internalLock != null && internalLock.isLockHeld(); - } - - void closeWaiter(final LockWaiter waiter, - final Promise<Void> closePromise) { - if (null == waiter) { - interruptTryLock(tryLockFuture, closePromise); - } else { - waiter.getAcquireFuture().addEventListener(OrderedFutureEventListener.of( - new FutureEventListener<Boolean>() { - @Override - public void onSuccess(Boolean value) { - unlockInternalLock(closePromise); - } - @Override - public void onFailure(Throwable cause) { - unlockInternalLock(closePromise); - } - }, lockStateExecutor, lockPath)); - FutureUtils.cancel(waiter.getAcquireFuture()); - } - } - - void interruptTryLock(final Future<LockWaiter> tryLockFuture, - final Promise<Void> closePromise) { - if (null == tryLockFuture) { - unlockInternalLock(closePromise); - } else { - tryLockFuture.addEventListener(OrderedFutureEventListener.of( - new FutureEventListener<LockWaiter>() { - @Override - public void onSuccess(LockWaiter waiter) { - closeWaiter(waiter, closePromise); - } - @Override - public void onFailure(Throwable cause) { - unlockInternalLock(closePromise); - } - }, lockStateExecutor, lockPath)); - FutureUtils.cancel(tryLockFuture); - } - } - - synchronized void unlockInternalLock(final Promise<Void> closePromise) { - if (internalLock == null) { - FutureUtils.setValue(closePromise, null); - } else { - internalLock.asyncUnlock().ensure(new AbstractFunction0<BoxedUnit>() { - @Override - public BoxedUnit apply() { - FutureUtils.setValue(closePromise, null); - return BoxedUnit.UNIT; - } - }); - } - } - - @Override - public Future<Void> asyncClose() { - final Promise<Void> closePromise; - synchronized (this) { - if (closed) { - return closeFuture; - } - closed = true; - closeFuture = closePromise = new Promise<Void>(); - } - final Promise<Void> closeWaiterFuture = new Promise<Void>(); - closeWaiterFuture.addEventListener(OrderedFutureEventListener.of(new FutureEventListener<Void>() { - @Override - public void onSuccess(Void value) { - complete(); - } - @Override - public void onFailure(Throwable cause) { - complete(); - } - - private void complete() { - FutureUtils.setValue(closePromise, null); - } - }, lockStateExecutor, lockPath)); - lockStateExecutor.submit(lockPath, new Runnable() { - @Override - public void run() { - closeWaiter(lockWaiter, closeWaiterFuture); - } - }); - return closePromise; - } - - void internalReacquireLock(final AtomicInteger numRetries, - final long lockTimeout, - final Promise<ZKDistributedLock> reacquirePromise) { - lockStateExecutor.submit(lockPath, new Runnable() { - @Override - public void run() { - doInternalReacquireLock(numRetries, lockTimeout, reacquirePromise); - } - }); - } - - void doInternalReacquireLock(final AtomicInteger numRetries, - final long lockTimeout, - final Promise<ZKDistributedLock> reacquirePromise) { - internalTryRetries.inc(); - Promise<ZKDistributedLock> tryPromise = new Promise<ZKDistributedLock>(); - tryPromise.addEventListener(new FutureEventListener<ZKDistributedLock>() { - @Override - public void onSuccess(ZKDistributedLock lock) { - FutureUtils.setValue(reacquirePromise, lock); - } - - @Override - public void onFailure(Throwable cause) { - if (cause instanceof OwnershipAcquireFailedException) { - // the lock has been acquired by others - FutureUtils.setException(reacquirePromise, cause); - } else { - if (numRetries.getAndDecrement() > 0 && !closed) { - internalReacquireLock(numRetries, lockTimeout, reacquirePromise); - } else { - FutureUtils.setException(reacquirePromise, cause); - } - } - } - }); - doAsyncAcquireWithSemaphore(tryPromise, 0); - } - - private Future<ZKDistributedLock> reacquireLock(boolean throwLockAcquireException) throws LockingException { - final Stopwatch stopwatch = Stopwatch.createStarted(); - Promise<ZKDistributedLock> lockPromise; - synchronized (this) { - if (closed) { - throw newLockClosedException(); - } - if (null != lockReacquireException) { - if (throwLockAcquireException) { - throw lockReacquireException; - } else { - return null; - } - } - if (null != lockReacquireFuture) { - return lockReacquireFuture; - } - LOG.info("reacquiring lock at {}", lockPath); - lockReacquireFuture = lockPromise = new Promise<ZKDistributedLock>(); - lockReacquireFuture.addEventListener(new FutureEventListener<ZKDistributedLock>() { - @Override - public void onSuccess(ZKDistributedLock lock) { - // if re-acquire successfully, clear the state. - synchronized (ZKDistributedLock.this) { - lockReacquireFuture = null; - } - reacquireStats.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); - } - - @Override - public void onFailure(Throwable cause) { - synchronized (ZKDistributedLock.this) { - if (cause instanceof LockingException) { - lockReacquireException = (LockingException) cause; - } else { - lockReacquireException = new LockingException(lockPath, - "Exception on re-acquiring lock", cause); - } - } - reacquireStats.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); - } - }); - } - reacquireCount.incrementAndGet(); - internalReacquireLock(new AtomicInteger(Integer.MAX_VALUE), 0, lockPromise); - return lockPromise; - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLock.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLock.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLock.java deleted file mode 100644 index dc57d55..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLock.java +++ /dev/null @@ -1,1363 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.lock; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Stopwatch; -import com.twitter.distributedlog.DistributedLogConstants; -import com.twitter.distributedlog.util.FailpointUtils; -import com.twitter.distributedlog.exceptions.LockingException; -import com.twitter.distributedlog.ZooKeeperClient; -import com.twitter.distributedlog.exceptions.DLInterruptedException; -import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException; -import com.twitter.distributedlog.exceptions.UnexpectedException; -import com.twitter.distributedlog.exceptions.ZKException; -import com.twitter.distributedlog.stats.OpStatsListener; -import com.twitter.distributedlog.util.FutureUtils; -import com.twitter.distributedlog.util.OrderedScheduler; -import com.twitter.util.Await; -import com.twitter.util.Duration; -import com.twitter.util.Function0; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; -import com.twitter.util.Return; -import com.twitter.util.Throw; -import com.twitter.util.TimeoutException; -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.NullStatsLogger; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.bookkeeper.util.SafeRunnable; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.data.Stat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction1; -import scala.runtime.BoxedUnit; - -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.net.URLDecoder; -import java.net.URLEncoder; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import static com.google.common.base.Charsets.UTF_8; - -/** - * A lock under a given zookeeper session. This is a one-time lock. - * It is not reusable: if lock failed, if zookeeper session is expired, if #unlock is called, - * it would be transitioned to expired or closed state. - * - * The Locking Procedure is described as below. - * - * <p> - * 0. if it is an immediate lock, it would get lock waiters first. if the lock is already held - * by someone. it would fail immediately with {@link com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException} - * with current owner. if there is no lock waiters, it would start locking procedure from 1. - * 1. prepare: create a sequential znode to identify the lock. - * 2. check lock waiters: get all lock waiters to check after prepare. if it is the first waiter, - * claim the ownership; if it is not the first waiter, but first waiter was itself (same client id and same session id) - * claim the ownership too; otherwise, it would set watcher on its sibling and wait it to disappared. - * </p> - * - * <pre> - * +-----------------+ - * | INIT | ------------------------------+ - * +--------+--------+ | - * | | - * | | - * +--------v--------+ | - * | PREPARING |----------------------------+ | - * +--------+--------+ | | - * | | | - * | | | - * +--------v--------+ | | - * +-------------| PREPARED |--------------+ | | - * | +-----^---------+-+ | | | - * | | | | | | | - * | | | | | | | - * | | | | | | | - * +------V-----------+ | | | +--------v----------+ | | - * | WAITING |-------+ | | | CLAIMED | | | - * +------+-----+-----+ | | +--+----------+-----+ | | - * | | | | | | | | - * | | | | | | | | - * | | | | | | | | - * | | | +-v----------v----+ | | | - * | +-------------------->| EXPIRED | | | | - * | | +--+--------------+ | | | - * | | | | | | - * | | | | | | - * | +--------V-------V-+ | | | - * +------------>| CLOSING |<---------------+----------+--+ - * +------------------+ - * | - * | - * | - * +--------V---------+ - * | CLOSED | - * +------------------+ - * </pre> - * - * <h3>Metrics</h3> - * <ul> - * <li>tryAcquire: opstats. latency spent on try locking operations. it includes timeouts. - * <li>tryTimeouts: counter. the number of timeouts on try locking operations - * <li>unlock: opstats. latency spent on unlock operations. - * </ul> - */ -class ZKSessionLock implements SessionLock { - - static final Logger LOG = LoggerFactory.getLogger(ZKSessionLock.class); - - private static final String LOCK_PATH_PREFIX = "/member_"; - private static final String LOCK_PART_SEP = "_"; - - public static String getLockPathPrefixV1(String lockPath) { - // member_ - return lockPath + LOCK_PATH_PREFIX; - } - - public static String getLockPathPrefixV2(String lockPath, String clientId) throws UnsupportedEncodingException { - // member_<clientid>_ - return lockPath + LOCK_PATH_PREFIX + URLEncoder.encode(clientId, UTF_8.name()) + LOCK_PART_SEP; - } - - public static String getLockPathPrefixV3(String lockPath, String clientId, long sessionOwner) throws UnsupportedEncodingException { - // member_<clientid>_s<owner_session>_ - StringBuilder sb = new StringBuilder(); - sb.append(lockPath).append(LOCK_PATH_PREFIX).append(URLEncoder.encode(clientId, UTF_8.name())).append(LOCK_PART_SEP) - .append("s").append(String.format("%10d", sessionOwner)).append(LOCK_PART_SEP); - return sb.toString(); - } - - public static byte[] serializeClientId(String clientId) { - return clientId.getBytes(UTF_8); - } - - public static String deserializeClientId(byte[] data) { - return new String(data, UTF_8); - } - - public static String getLockIdFromPath(String path) { - // We only care about our actual id since we want to compare ourselves to siblings. - if (path.contains("/")) { - return path.substring(path.lastIndexOf("/") + 1); - } else { - return path; - } - } - - static final Comparator<String> MEMBER_COMPARATOR = new Comparator<String>() { - public int compare(String o1, String o2) { - int l1 = parseMemberID(o1); - int l2 = parseMemberID(o2); - return l1 - l2; - } - }; - - static enum State { - INIT, // initialized state - PREPARING, // preparing to lock, but no lock node created - PREPARED, // lock node created - CLAIMED, // claim lock ownership - WAITING, // waiting for the ownership - EXPIRED, // lock is expired - CLOSING, // lock is being closed - CLOSED, // lock is closed - } - - /** - * Convenience class for state management. Provide debuggability features by tracing unxpected state - * transitions. - */ - static class StateManagement { - - static final Logger LOG = LoggerFactory.getLogger(StateManagement.class); - - private volatile State state; - - StateManagement() { - this.state = State.INIT; - } - - public void transition(State toState) { - if (!validTransition(toState)) { - LOG.error("Invalid state transition from {} to {} ", - new Object[] { this.state, toState, getStack() }); - } - this.state = toState; - } - - private boolean validTransition(State toState) { - switch (toState) { - case INIT: - return false; - case PREPARING: - return inState(State.INIT); - case PREPARED: - return inState(State.PREPARING) || inState(State.WAITING); - case CLAIMED: - return inState(State.PREPARED); - case WAITING: - return inState(State.PREPARED); - case EXPIRED: - return isTryingOrClaimed(); - case CLOSING: - return !inState(State.CLOSED); - case CLOSED: - return inState(State.CLOSING) || inState(State.CLOSED); - default: - return false; - } - } - - private State getState() { - return state; - } - - private boolean isTryingOrClaimed() { - return inState(State.PREPARING) || inState(State.PREPARED) || - inState(State.WAITING) || inState(State.CLAIMED); - } - - public boolean isExpiredOrClosing() { - return inState(State.CLOSED) || inState(State.EXPIRED) || inState(State.CLOSING); - } - - public boolean isExpiredOrClosed() { - return inState(State.CLOSED) || inState(State.EXPIRED); - } - - public boolean isClosed() { - return inState(State.CLOSED); - } - - private boolean inState(final State state) { - return state == this.state; - } - - private Exception getStack() { - return new Exception(); - } - } - - private final ZooKeeperClient zkClient; - private final ZooKeeper zk; - private final String lockPath; - // Identify a unique lock - private final Pair<String, Long> lockId; - private StateManagement lockState; - private final DistributedLockContext lockContext; - - private final Promise<Boolean> acquireFuture; - private String currentId; - private String currentNode; - private String watchedNode; - private LockWatcher watcher; - private final AtomicInteger epoch = new AtomicInteger(0); - private final OrderedScheduler lockStateExecutor; - private LockListener lockListener = null; - private final long lockOpTimeout; - - private final OpStatsLogger tryStats; - private final Counter tryTimeouts; - private final OpStatsLogger unlockStats; - - ZKSessionLock(ZooKeeperClient zkClient, - String lockPath, - String clientId, - OrderedScheduler lockStateExecutor) - throws IOException { - this(zkClient, - lockPath, - clientId, - lockStateExecutor, - DistributedLogConstants.LOCK_OP_TIMEOUT_DEFAULT * 1000, NullStatsLogger.INSTANCE, - new DistributedLockContext()); - } - - /** - * Creates a distributed lock using the given {@code zkClient} to coordinate locking. - * - * @param zkClient The ZooKeeper client to use. - * @param lockPath The path used to manage the lock under. - * @param clientId client id use for lock. - * @param lockStateExecutor executor to execute all lock state changes. - * @param lockOpTimeout timeout of lock operations - * @param statsLogger stats logger - */ - public ZKSessionLock(ZooKeeperClient zkClient, - String lockPath, - String clientId, - OrderedScheduler lockStateExecutor, - long lockOpTimeout, - StatsLogger statsLogger, - DistributedLockContext lockContext) - throws IOException { - this.zkClient = zkClient; - try { - this.zk = zkClient.get(); - } catch (ZooKeeperClient.ZooKeeperConnectionException zce) { - throw new ZKException("Failed to get zookeeper client for lock " + lockPath, - KeeperException.Code.CONNECTIONLOSS); - } catch (InterruptedException e) { - throw new DLInterruptedException("Interrupted on getting zookeeper client for lock " + lockPath, e); - } - this.lockPath = lockPath; - this.lockId = Pair.of(clientId, this.zk.getSessionId()); - this.lockContext = lockContext; - this.lockStateExecutor = lockStateExecutor; - this.lockState = new StateManagement(); - this.lockOpTimeout = lockOpTimeout; - - this.tryStats = statsLogger.getOpStatsLogger("tryAcquire"); - this.tryTimeouts = statsLogger.getCounter("tryTimeouts"); - this.unlockStats = statsLogger.getOpStatsLogger("unlock"); - - // Attach interrupt handler to acquire future so clients can abort the future. - this.acquireFuture = new Promise<Boolean>(new com.twitter.util.Function<Throwable, BoxedUnit>() { - @Override - public BoxedUnit apply(Throwable t) { - // This will set the lock state to closed, and begin to cleanup the zk lock node. - // We have to be careful not to block here since doing so blocks the ordered lock - // state executor which can cause deadlocks depending on how futures are chained. - ZKSessionLock.this.asyncUnlock(t); - // Note re. logging and exceptions: errors are already logged by unlockAsync. - return BoxedUnit.UNIT; - } - }); - } - - @Override - public ZKSessionLock setLockListener(LockListener lockListener) { - this.lockListener = lockListener; - return this; - } - - String getLockPath() { - return this.lockPath; - } - - @VisibleForTesting - AtomicInteger getEpoch() { - return epoch; - } - - @VisibleForTesting - State getLockState() { - return lockState.getState(); - } - - @VisibleForTesting - Pair<String, Long> getLockId() { - return lockId; - } - - public boolean isLockExpired() { - return lockState.isExpiredOrClosing(); - } - - @Override - public boolean isLockHeld() { - return lockState.inState(State.CLAIMED); - } - - /** - * Execute a lock action of a given <i>lockEpoch</i> in ordered safe way. - * - * @param lockEpoch - * lock epoch - * @param func - * function to execute a lock action - */ - protected void executeLockAction(final int lockEpoch, final LockAction func) { - lockStateExecutor.submit(lockPath, new SafeRunnable() { - @Override - public void safeRun() { - if (ZKSessionLock.this.epoch.get() == lockEpoch) { - if (LOG.isTraceEnabled()) { - LOG.trace("{} executing lock action '{}' under epoch {} for lock {}", - new Object[]{lockId, func.getActionName(), lockEpoch, lockPath}); - } - func.execute(); - if (LOG.isTraceEnabled()) { - LOG.trace("{} executed lock action '{}' under epoch {} for lock {}", - new Object[]{lockId, func.getActionName(), lockEpoch, lockPath}); - } - } else { - if (LOG.isTraceEnabled()) { - LOG.trace("{} skipped executing lock action '{}' for lock {}, since epoch is changed from {} to {}.", - new Object[]{lockId, func.getActionName(), lockPath, lockEpoch, ZKSessionLock.this.epoch.get()}); - } - } - } - }); - } - - /** - * Execute a lock action of a given <i>lockEpoch</i> in ordered safe way. If the lock action couln't be - * executed due to epoch changed, fail the given <i>promise</i> with - * {@link EpochChangedException} - * - * @param lockEpoch - * lock epoch - * @param func - * function to execute a lock action - * @param promise - * promise - */ - protected <T> void executeLockAction(final int lockEpoch, final LockAction func, final Promise<T> promise) { - lockStateExecutor.submit(lockPath, new SafeRunnable() { - @Override - public void safeRun() { - int currentEpoch = ZKSessionLock.this.epoch.get(); - if (currentEpoch == lockEpoch) { - if (LOG.isTraceEnabled()) { - LOG.trace("{} executed lock action '{}' under epoch {} for lock {}", - new Object[]{lockId, func.getActionName(), lockEpoch, lockPath}); - } - func.execute(); - if (LOG.isTraceEnabled()) { - LOG.trace("{} executed lock action '{}' under epoch {} for lock {}", - new Object[]{lockId, func.getActionName(), lockEpoch, lockPath}); - } - } else { - if (LOG.isTraceEnabled()) { - LOG.trace("{} skipped executing lock action '{}' for lock {}, since epoch is changed from {} to {}.", - new Object[]{lockId, func.getActionName(), lockPath, lockEpoch, currentEpoch}); - } - promise.setException(new EpochChangedException(lockPath, lockEpoch, currentEpoch)); - } - } - }); - } - - /** - * Parse member id generated by zookeeper from given <i>nodeName</i> - * - * @param nodeName - * lock node name - * @return member id generated by zookeeper - */ - static int parseMemberID(String nodeName) { - int id = -1; - String[] parts = nodeName.split("_"); - if (parts.length > 0) { - try { - id = Integer.parseInt(parts[parts.length - 1]); - } catch (NumberFormatException nfe) { - // make it to be MAX_VALUE, so the bad znode will never acquire the lock - id = Integer.MAX_VALUE; - } - } - return id; - } - - static boolean areLockWaitersInSameSession(String node1, String node2) { - String[] parts1 = node1.split("_"); - String[] parts2 = node2.split("_"); - if (parts1.length != 4 || parts2.length != 4) { - return node1.equals(node2); - } - if (!parts1[2].startsWith("s") || !parts2[2].startsWith("s")) { - return node1.equals(node2); - } - long sessionOwner1 = Long.parseLong(parts1[2].substring(1)); - long sessionOwner2 = Long.parseLong(parts2[2].substring(1)); - if (sessionOwner1 != sessionOwner2) { - return false; - } - String clientId1, clientId2; - try { - clientId1 = URLDecoder.decode(parts1[1], UTF_8.name()); - clientId2 = URLDecoder.decode(parts2[1], UTF_8.name()); - return clientId1.equals(clientId2); - } catch (UnsupportedEncodingException e) { - // if failed to parse client id, we have to get client id by zookeeper#getData. - return node1.equals(node2); - } - } - - /** - * Get client id and its ephemeral owner. - * - * @param zkClient - * zookeeper client - * @param lockPath - * lock path - * @param nodeName - * node name - * @return client id and its ephemeral owner. - */ - static Future<Pair<String, Long>> asyncParseClientID(ZooKeeper zkClient, String lockPath, String nodeName) { - String[] parts = nodeName.split("_"); - // member_<clientid>_s<owner_session>_ - if (4 == parts.length && parts[2].startsWith("s")) { - long sessionOwner = Long.parseLong(parts[2].substring(1)); - String clientId; - try { - clientId = URLDecoder.decode(parts[1], UTF_8.name()); - return Future.value(Pair.of(clientId, sessionOwner)); - } catch (UnsupportedEncodingException e) { - // if failed to parse client id, we have to get client id by zookeeper#getData. - } - } - final Promise<Pair<String, Long>> promise = new Promise<Pair<String, Long>>(); - zkClient.getData(lockPath + "/" + nodeName, false, new AsyncCallback.DataCallback() { - @Override - public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { - if (KeeperException.Code.OK.intValue() != rc) { - promise.setException(KeeperException.create(KeeperException.Code.get(rc))); - } else { - promise.setValue(Pair.of(deserializeClientId(data), stat.getEphemeralOwner())); - } - } - }, null); - return promise; - } - - @Override - public Future<LockWaiter> asyncTryLock(final long timeout, final TimeUnit unit) { - final Promise<String> result = new Promise<String>(); - final boolean wait = DistributedLogConstants.LOCK_IMMEDIATE != timeout; - if (wait) { - asyncTryLock(wait, result); - } else { - // try to check locks first - zk.getChildren(lockPath, null, new AsyncCallback.Children2Callback() { - @Override - public void processResult(final int rc, String path, Object ctx, - final List<String> children, Stat stat) { - lockStateExecutor.submit(lockPath, new SafeRunnable() { - @Override - public void safeRun() { - if (!lockState.inState(State.INIT)) { - result.setException(new LockStateChangedException(lockPath, lockId, State.INIT, lockState.getState())); - return; - } - if (KeeperException.Code.OK.intValue() != rc) { - result.setException(KeeperException.create(KeeperException.Code.get(rc))); - return; - } - - FailpointUtils.checkFailPointNoThrow(FailpointUtils.FailPointName.FP_LockTryAcquire); - - Collections.sort(children, MEMBER_COMPARATOR); - if (children.size() > 0) { - asyncParseClientID(zk, lockPath, children.get(0)).addEventListener( - new FutureEventListener<Pair<String, Long>>() { - @Override - public void onSuccess(Pair<String, Long> owner) { - if (!checkOrClaimLockOwner(owner, result)) { - acquireFuture.updateIfEmpty(new Return<Boolean>(false)); - } - } - - @Override - public void onFailure(final Throwable cause) { - lockStateExecutor.submit(lockPath, new SafeRunnable() { - @Override - public void safeRun() { - result.setException(cause); - } - }); - } - }); - } else { - asyncTryLock(wait, result); - } - } - }); - } - }, null); - } - - final Promise<Boolean> waiterAcquireFuture = new Promise<Boolean>(new com.twitter.util.Function<Throwable, BoxedUnit>() { - @Override - public BoxedUnit apply(Throwable t) { - acquireFuture.raise(t); - return BoxedUnit.UNIT; - } - }); - return result.map(new AbstractFunction1<String, LockWaiter>() { - @Override - public LockWaiter apply(final String currentOwner) { - final Exception acquireException = new OwnershipAcquireFailedException(lockPath, currentOwner); - FutureUtils.within( - acquireFuture, - timeout, - unit, - acquireException, - lockStateExecutor, - lockPath - ).addEventListener(new FutureEventListener<Boolean>() { - - @Override - public void onSuccess(Boolean acquired) { - completeOrFail(acquireException); - } - - @Override - public void onFailure(final Throwable acquireCause) { - completeOrFail(acquireException); - } - - private void completeOrFail(final Throwable acquireCause) { - if (isLockHeld()) { - waiterAcquireFuture.setValue(true); - } else { - asyncUnlock().addEventListener(new FutureEventListener<BoxedUnit>() { - @Override - public void onSuccess(BoxedUnit value) { - waiterAcquireFuture.setException(acquireCause); - } - - @Override - public void onFailure(Throwable cause) { - waiterAcquireFuture.setException(acquireCause); - } - }); - } - } - });; - return new LockWaiter( - lockId.getLeft(), - currentOwner, - waiterAcquireFuture); - } - }); - } - - private boolean checkOrClaimLockOwner(final Pair<String, Long> currentOwner, - final Promise<String> result) { - if (lockId.compareTo(currentOwner) != 0 && !lockContext.hasLockId(currentOwner)) { - lockStateExecutor.submit(lockPath, new SafeRunnable() { - @Override - public void safeRun() { - result.setValue(currentOwner.getLeft()); - } - }); - return false; - } - // current owner is itself - final int curEpoch = epoch.incrementAndGet(); - executeLockAction(curEpoch, new LockAction() { - @Override - public void execute() { - if (!lockState.inState(State.INIT)) { - result.setException(new LockStateChangedException(lockPath, lockId, State.INIT, lockState.getState())); - return; - } - asyncTryLock(false, result); - } - @Override - public String getActionName() { - return "claimOwnership(owner=" + currentOwner + ")"; - } - }, result); - return true; - } - - /** - * Try lock. If it failed, it would cleanup its attempt. - * - * @param wait - * whether to wait for ownership. - * @param result - * promise to satisfy with current lock owner - */ - private void asyncTryLock(boolean wait, final Promise<String> result) { - final Promise<String> lockResult = new Promise<String>(); - lockResult.addEventListener(new FutureEventListener<String>() { - @Override - public void onSuccess(String currentOwner) { - result.setValue(currentOwner); - } - - @Override - public void onFailure(final Throwable lockCause) { - // If tryLock failed due to state changed, we don't need to cleanup - if (lockCause instanceof LockStateChangedException) { - LOG.info("skipping cleanup for {} at {} after encountering lock " + - "state change exception : ", new Object[] { lockId, lockPath, lockCause }); - result.setException(lockCause); - return; - } - if (LOG.isDebugEnabled()) { - LOG.debug("{} is cleaning up its lock state for {} due to : ", - new Object[] { lockId, lockPath, lockCause }); - } - - // If we encountered any exception we should cleanup - Future<BoxedUnit> unlockResult = asyncUnlock(); - unlockResult.addEventListener(new FutureEventListener<BoxedUnit>() { - @Override - public void onSuccess(BoxedUnit value) { - result.setException(lockCause); - } - @Override - public void onFailure(Throwable cause) { - result.setException(lockCause); - } - }); - } - }); - asyncTryLockWithoutCleanup(wait, lockResult); - } - - /** - * Try lock. If wait is true, it would wait and watch sibling to acquire lock when - * the sibling is dead. <i>acquireFuture</i> will be notified either it locked successfully - * or the lock failed. The promise will only satisfy with current lock owner. - * - * NOTE: the <i>promise</i> is only satisfied on <i>lockStateExecutor</i>, so any - * transformations attached on promise will be executed in order. - * - * @param wait - * whether to wait for ownership. - * @param promise - * promise to satisfy with current lock owner. - */ - private void asyncTryLockWithoutCleanup(final boolean wait, final Promise<String> promise) { - executeLockAction(epoch.get(), new LockAction() { - @Override - public void execute() { - if (!lockState.inState(State.INIT)) { - promise.setException(new LockStateChangedException(lockPath, lockId, State.INIT, lockState.getState())); - return; - } - lockState.transition(State.PREPARING); - - final int curEpoch = epoch.incrementAndGet(); - watcher = new LockWatcher(curEpoch); - // register watcher for session expires - zkClient.register(watcher); - // Encode both client id and session in the lock node - String myPath; - try { - // member_<clientid>_s<owner_session>_ - myPath = getLockPathPrefixV3(lockPath, lockId.getLeft(), lockId.getRight()); - } catch (UnsupportedEncodingException uee) { - myPath = getLockPathPrefixV1(lockPath); - } - zk.create(myPath, serializeClientId(lockId.getLeft()), zkClient.getDefaultACL(), CreateMode.EPHEMERAL_SEQUENTIAL, - new AsyncCallback.StringCallback() { - @Override - public void processResult(final int rc, String path, Object ctx, final String name) { - executeLockAction(curEpoch, new LockAction() { - @Override - public void execute() { - if (KeeperException.Code.OK.intValue() != rc) { - KeeperException ke = KeeperException.create(KeeperException.Code.get(rc)); - promise.setException(ke); - return; - } - - if (FailpointUtils.checkFailPointNoThrow(FailpointUtils.FailPointName.FP_LockTryCloseRaceCondition)) { - lockState.transition(State.CLOSING); - lockState.transition(State.CLOSED); - } - - if (null != currentNode) { - LOG.error("Current node for {} overwritten current = {} new = {}", - new Object[] { lockPath, lockId, getLockIdFromPath(currentNode) }); - } - - currentNode = name; - currentId = getLockIdFromPath(currentNode); - LOG.trace("{} received member id for lock {}", lockId, currentId); - - if (lockState.isExpiredOrClosing()) { - // Delete node attempt may have come after PREPARING but before create node, in which case - // we'd be left with a dangling node unless we clean up. - Promise<BoxedUnit> deletePromise = new Promise<BoxedUnit>(); - deleteLockNode(deletePromise); - deletePromise.ensure(new Function0<BoxedUnit>() { - public BoxedUnit apply() { - promise.setException(new LockClosedException(lockPath, lockId, lockState.getState())); - return BoxedUnit.UNIT; - } - }); - return; - } - - lockState.transition(State.PREPARED); - checkLockOwnerAndWaitIfPossible(watcher, wait, promise); - } - - @Override - public String getActionName() { - return "postPrepare(wait=" + wait + ")"; - } - }); - } - }, null); - } - @Override - public String getActionName() { - return "prepare(wait=" + wait + ")"; - } - }, promise); - } - - @Override - public void tryLock(long timeout, TimeUnit unit) throws LockingException { - final Stopwatch stopwatch = Stopwatch.createStarted(); - Future<LockWaiter> tryFuture = asyncTryLock(timeout, unit); - LockWaiter waiter = waitForTry(stopwatch, tryFuture); - boolean acquired = waiter.waitForAcquireQuietly(); - if (!acquired) { - throw new OwnershipAcquireFailedException(lockPath, waiter.getCurrentOwner()); - } - } - - synchronized LockWaiter waitForTry(Stopwatch stopwatch, Future<LockWaiter> tryFuture) - throws LockingException { - boolean success = false; - boolean stateChanged = false; - LockWaiter waiter; - try { - waiter = Await.result(tryFuture, Duration.fromMilliseconds(lockOpTimeout)); - success = true; - } catch (LockStateChangedException ex) { - stateChanged = true; - throw ex; - } catch (LockingException ex) { - throw ex; - } catch (TimeoutException toe) { - tryTimeouts.inc(); - throw new LockingException(lockPath, "Timeout during try phase of lock acquire", toe); - } catch (Exception ex) { - String message = getLockId() + " failed to lock " + lockPath; - throw new LockingException(lockPath, message, ex); - } finally { - if (success) { - tryStats.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); - } else { - tryStats.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); - } - // This can only happen for a Throwable thats not an - // Exception, i.e. an Error - if (!success && !stateChanged) { - unlock(); - } - } - return waiter; - } - - @Override - public Future<BoxedUnit> asyncUnlock() { - return asyncUnlock(new LockClosedException(lockPath, lockId, lockState.getState())); - } - - Future<BoxedUnit> asyncUnlock(final Throwable cause) { - final Promise<BoxedUnit> promise = new Promise<BoxedUnit>(); - - // Use lock executor here rather than lock action, because we want this opertaion to be applied - // whether the epoch has changed or not. The member node is EPHEMERAL_SEQUENTIAL so there's no - // risk of an ABA problem where we delete and recreate a node and then delete it again here. - lockStateExecutor.submit(lockPath, new SafeRunnable() { - @Override - public void safeRun() { - acquireFuture.updateIfEmpty(new Throw<Boolean>(cause)); - unlockInternal(promise); - promise.addEventListener(new OpStatsListener<BoxedUnit>(unlockStats)); - } - }); - - return promise; - } - - @Override - public void unlock() { - Future<BoxedUnit> unlockResult = asyncUnlock(); - try { - Await.result(unlockResult, Duration.fromMilliseconds(lockOpTimeout)); - } catch (TimeoutException toe) { - // This shouldn't happen unless we lose a watch, and may result in a leaked lock. - LOG.error("Timeout unlocking {} owned by {} : ", new Object[] { lockPath, lockId, toe }); - } catch (Exception e) { - LOG.warn("{} failed to unlock {} : ", new Object[] { lockId, lockPath, e }); - } - } - - // Lock State Changes (all state changes should be executed under a LockAction) - - private void claimOwnership(int lockEpoch) { - lockState.transition(State.CLAIMED); - // clear previous lock ids - lockContext.clearLockIds(); - // add current lock id - lockContext.addLockId(lockId); - if (LOG.isDebugEnabled()) { - LOG.debug("Notify lock waiters on {} at {} : watcher epoch {}, lock epoch {}", - new Object[] { lockPath, System.currentTimeMillis(), - lockEpoch, ZKSessionLock.this.epoch.get() }); - } - acquireFuture.updateIfEmpty(new Return<Boolean>(true)); - } - - /** - * NOTE: unlockInternal should only after try lock. - */ - private void unlockInternal(final Promise<BoxedUnit> promise) { - - // already closed or expired, nothing to cleanup - this.epoch.incrementAndGet(); - if (null != watcher) { - this.zkClient.unregister(watcher); - } - - if (lockState.inState(State.CLOSED)) { - promise.setValue(BoxedUnit.UNIT); - return; - } - - LOG.info("Lock {} for {} is closed from state {}.", - new Object[] { lockId, lockPath, lockState.getState() }); - - final boolean skipCleanup = lockState.inState(State.INIT) || lockState.inState(State.EXPIRED); - - lockState.transition(State.CLOSING); - - if (skipCleanup) { - // Nothing to cleanup if INIT (never tried) or EXPIRED (ephemeral node - // auto-removed) - lockState.transition(State.CLOSED); - promise.setValue(BoxedUnit.UNIT); - return; - } - - // In any other state, we should clean up the member node - Promise<BoxedUnit> deletePromise = new Promise<BoxedUnit>(); - deleteLockNode(deletePromise); - - // Set the state to closed after we've cleaned up - deletePromise.addEventListener(new FutureEventListener<BoxedUnit>() { - @Override - public void onSuccess(BoxedUnit complete) { - lockStateExecutor.submit(lockPath, new SafeRunnable() { - @Override - public void safeRun() { - lockState.transition(State.CLOSED); - promise.setValue(BoxedUnit.UNIT); - } - }); - } - @Override - public void onFailure(Throwable cause) { - // Delete failure is quite serious (causes lock leak) and should be - // handled better - LOG.error("lock node delete failed {} {}", lockId, lockPath); - promise.setValue(BoxedUnit.UNIT); - } - }); - } - - private void deleteLockNode(final Promise<BoxedUnit> promise) { - if (null == currentNode) { - promise.setValue(BoxedUnit.UNIT); - return; - } - - zk.delete(currentNode, -1, new AsyncCallback.VoidCallback() { - @Override - public void processResult(final int rc, final String path, Object ctx) { - lockStateExecutor.submit(lockPath, new SafeRunnable() { - @Override - public void safeRun() { - if (KeeperException.Code.OK.intValue() == rc) { - LOG.info("Deleted lock node {} for {} successfully.", path, lockId); - } else if (KeeperException.Code.NONODE.intValue() == rc || - KeeperException.Code.SESSIONEXPIRED.intValue() == rc) { - LOG.info("Delete node failed. Node already gone for node {} id {}, rc = {}", - new Object[] { path, lockId, KeeperException.Code.get(rc) }); - } else { - LOG.error("Failed on deleting lock node {} for {} : {}", - new Object[] { path, lockId, KeeperException.Code.get(rc) }); - } - - FailpointUtils.checkFailPointNoThrow(FailpointUtils.FailPointName.FP_LockUnlockCleanup); - promise.setValue(BoxedUnit.UNIT); - } - }); - } - }, null); - } - - /** - * Handle session expired for lock watcher at epoch <i>lockEpoch</i>. - * - * @param lockEpoch - * lock epoch - */ - private void handleSessionExpired(final int lockEpoch) { - executeLockAction(lockEpoch, new LockAction() { - @Override - public void execute() { - if (lockState.inState(State.CLOSED) || lockState.inState(State.CLOSING)) { - // Already fully closed, no need to process expire. - return; - } - - boolean shouldNotifyLockListener = lockState.inState(State.CLAIMED); - - lockState.transition(State.EXPIRED); - - // remove the watcher - if (null != watcher) { - zkClient.unregister(watcher); - } - - // increment epoch to avoid any ongoing locking action - ZKSessionLock.this.epoch.incrementAndGet(); - - // if session expired, just notify the waiter. as the lock acquire doesn't succeed. - // we don't even need to clean up the lock as the znode will disappear after session expired - acquireFuture.updateIfEmpty(new Throw<Boolean>( - new LockSessionExpiredException(lockPath, lockId, lockState.getState()))); - - // session expired, ephemeral node is gone. - currentNode = null; - currentId = null; - - if (shouldNotifyLockListener) { - // if session expired after claimed, we need to notify the caller to re-lock - if (null != lockListener) { - lockListener.onExpired(); - } - } - } - - @Override - public String getActionName() { - return "handleSessionExpired(epoch=" + lockEpoch + ")"; - } - }); - } - - private void handleNodeDelete(int lockEpoch, final WatchedEvent event) { - executeLockAction(lockEpoch, new LockAction() { - @Override - public void execute() { - // The lock is either expired or closed - if (!lockState.inState(State.WAITING)) { - LOG.info("{} ignore watched node {} deleted event, since lock state has moved to {}.", - new Object[] { lockId, event.getPath(), lockState.getState() }); - return; - } - lockState.transition(State.PREPARED); - - // we don't need to wait and check the result, since: - // 1) if it claimed the ownership, it would notify the waiters when claimed ownerships - // 2) if it failed, it would also notify the waiters, the waiters would cleanup the state. - checkLockOwnerAndWaitIfPossible(watcher, true); - } - - @Override - public String getActionName() { - return "handleNodeDelete(path=" + event.getPath() + ")"; - } - }); - } - - private Future<String> checkLockOwnerAndWaitIfPossible(final LockWatcher lockWatcher, - final boolean wait) { - final Promise<String> promise = new Promise<String>(); - checkLockOwnerAndWaitIfPossible(lockWatcher, wait, promise); - return promise; - } - - /** - * Check Lock Owner Phase 1 : Get all lock waiters. - * - * @param lockWatcher - * lock watcher. - * @param wait - * whether to wait for ownership. - * @param promise - * promise to satisfy with current lock owner - */ - private void checkLockOwnerAndWaitIfPossible(final LockWatcher lockWatcher, - final boolean wait, - final Promise<String> promise) { - zk.getChildren(lockPath, false, new AsyncCallback.Children2Callback() { - @Override - public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { - processLockWaiters(lockWatcher, wait, rc, children, promise); - } - }, null); - } - - /** - * Check Lock Owner Phase 2 : check all lock waiters to get current owner and wait for ownership if necessary. - * - * @param lockWatcher - * lock watcher. - * @param wait - * whether to wait for ownership. - * @param getChildrenRc - * result of getting all lock waiters - * @param children - * current lock waiters. - * @param promise - * promise to satisfy with current lock owner. - */ - private void processLockWaiters(final LockWatcher lockWatcher, - final boolean wait, - final int getChildrenRc, - final List<String> children, - final Promise<String> promise) { - executeLockAction(lockWatcher.epoch, new LockAction() { - @Override - public void execute() { - if (!lockState.inState(State.PREPARED)) { // e.g. lock closed or session expired after prepared - promise.setException(new LockStateChangedException(lockPath, lockId, State.PREPARED, lockState.getState())); - return; - } - - if (KeeperException.Code.OK.intValue() != getChildrenRc) { - promise.setException(KeeperException.create(KeeperException.Code.get(getChildrenRc))); - return; - } - if (children.isEmpty()) { - LOG.error("Error, member list is empty for lock {}.", lockPath); - promise.setException(new UnexpectedException("Empty member list for lock " + lockPath)); - return; - } - - // sort the children - Collections.sort(children, MEMBER_COMPARATOR); - final String cid = currentId; - final int memberIndex = children.indexOf(cid); - if (LOG.isDebugEnabled()) { - LOG.debug("{} is the number {} member in the list.", cid, memberIndex); - } - // If we hold the lock - if (memberIndex == 0) { - LOG.info("{} acquired the lock {}.", cid, lockPath); - claimOwnership(lockWatcher.epoch); - promise.setValue(cid); - } else if (memberIndex > 0) { // we are in the member list but we didn't hold the lock - // get ownership of current owner - asyncParseClientID(zk, lockPath, children.get(0)).addEventListener(new FutureEventListener<Pair<String, Long>>() { - @Override - public void onSuccess(Pair<String, Long> currentOwner) { - watchLockOwner(lockWatcher, wait, - cid, children.get(memberIndex - 1), children.get(0), currentOwner, promise); - } - @Override - public void onFailure(final Throwable cause) { - // ensure promise is satisfied in lock thread - executeLockAction(lockWatcher.epoch, new LockAction() { - @Override - public void execute() { - promise.setException(cause); - } - - @Override - public String getActionName() { - return "handleFailureOnParseClientID(lockPath=" + lockPath + ")"; - } - }, promise); - } - }); - } else { - LOG.error("Member {} doesn't exist in the members list {} for lock {}.", - new Object[]{ cid, children, lockPath}); - promise.setException( - new UnexpectedException("Member " + cid + " doesn't exist in member list " + - children + " for lock " + lockPath)); - } - } - - @Override - public String getActionName() { - return "processLockWaiters(rc=" + getChildrenRc + ", waiters=" + children + ")"; - } - }, promise); - } - - /** - * Check Lock Owner Phase 3: watch sibling node for lock ownership. - * - * @param lockWatcher - * lock watcher. - * @param wait - * whether to wait for ownership. - * @param myNode - * my lock node. - * @param siblingNode - * my sibling lock node. - * @param ownerNode - * owner lock node. - * @param currentOwner - * current owner info. - * @param promise - * promise to satisfy with current lock owner. - */ - private void watchLockOwner(final LockWatcher lockWatcher, - final boolean wait, - final String myNode, - final String siblingNode, - final String ownerNode, - final Pair<String, Long> currentOwner, - final Promise<String> promise) { - executeLockAction(lockWatcher.epoch, new LockAction() { - @Override - public void execute() { - boolean shouldWatch; - final boolean shouldClaimOwnership; - if (lockContext.hasLockId(currentOwner) && siblingNode.equals(ownerNode)) { - // if the current owner is the znode left from previous session - // we should watch it and claim ownership - shouldWatch = true; - shouldClaimOwnership = true; - LOG.info("LockWatcher {} for {} found its previous session {} held lock, watch it to claim ownership.", - new Object[] { myNode, lockPath, currentOwner }); - } else if (lockId.compareTo(currentOwner) == 0 && areLockWaitersInSameSession(siblingNode, ownerNode)) { - // I found that my sibling is the current owner with same lock id (client id & session id) - // It must be left by any race condition from same zookeeper client - shouldWatch = true; - shouldClaimOwnership = true; - LOG.info("LockWatcher {} for {} found itself {} already held lock at sibling node {}, watch it to claim ownership.", - new Object[]{myNode, lockPath, lockId, siblingNode}); - } else { - shouldWatch = wait; - if (wait) { - if (LOG.isDebugEnabled()) { - LOG.debug("Current LockWatcher for {} with ephemeral node {}, is waiting for {} to release lock at {}.", - new Object[]{lockPath, myNode, siblingNode, System.currentTimeMillis()}); - } - } - shouldClaimOwnership = false; - } - - // watch sibling for lock ownership - if (shouldWatch) { - watchedNode = String.format("%s/%s", lockPath, siblingNode); - zk.exists(watchedNode, lockWatcher, new AsyncCallback.StatCallback() { - @Override - public void processResult(final int rc, String path, Object ctx, final Stat stat) { - executeLockAction(lockWatcher.epoch, new LockAction() { - @Override - public void execute() { - if (!lockState.inState(State.PREPARED)) { - promise.setException(new LockStateChangedException(lockPath, lockId, State.PREPARED, lockState.getState())); - return; - } - - if (KeeperException.Code.OK.intValue() == rc) { - if (shouldClaimOwnership) { - // watch owner successfully - LOG.info("LockWatcher {} claimed ownership for {} after set watcher on {}.", - new Object[]{ myNode, lockPath, ownerNode }); - claimOwnership(lockWatcher.epoch); - promise.setValue(currentOwner.getLeft()); - } else { - // watch sibling successfully - lockState.transition(State.WAITING); - promise.setValue(currentOwner.getLeft()); - } - } else if (KeeperException.Code.NONODE.intValue() == rc) { - // sibling just disappeared, it might be the chance to claim ownership - checkLockOwnerAndWaitIfPossible(lockWatcher, wait, promise); - } else { - promise.setException(KeeperException.create(KeeperException.Code.get(rc))); - } - } - - @Override - public String getActionName() { - StringBuilder sb = new StringBuilder(); - sb.append("postWatchLockOwner(myNode=").append(myNode).append(", siblingNode=") - .append(siblingNode).append(", ownerNode=").append(ownerNode).append(")"); - return sb.toString(); - } - }, promise); - } - }, null); - } else { - promise.setValue(currentOwner.getLeft()); - } - } - - @Override - public String getActionName() { - StringBuilder sb = new StringBuilder(); - sb.append("watchLockOwner(myNode=").append(myNode).append(", siblingNode=") - .append(siblingNode).append(", ownerNode=").append(ownerNode).append(")"); - return sb.toString(); - } - }, promise); - } - - class LockWatcher implements Watcher { - - // Enforce a epoch number to avoid a race on canceling attempt - final int epoch; - - LockWatcher(int epoch) { - this.epoch = epoch; - } - - @Override - public void process(WatchedEvent event) { - LOG.debug("Received event {} from lock {} at {} : watcher epoch {}, lock epoch {}.", - new Object[] { event, lockPath, System.currentTimeMillis(), epoch, ZKSessionLock.this.epoch.get() }); - if (event.getType() == Watcher.Event.EventType.None) { - switch (event.getState()) { - case SyncConnected: - break; - case Expired: - LOG.info("Session {} is expired for lock {} at {} : watcher epoch {}, lock epoch {}.", - new Object[] { lockId.getRight(), lockPath, System.currentTimeMillis(), - epoch, ZKSessionLock.this.epoch.get() }); - handleSessionExpired(epoch); - break; - default: - break; - } - } else if (event.getType() == Event.EventType.NodeDeleted) { - // this handles the case where we have aborted a lock and deleted ourselves but still have a - // watch on the nextLowestNode. This is a workaround since ZK doesn't support unsub. - if (!event.getPath().equals(watchedNode)) { - LOG.warn("{} (watching {}) ignored watched event from {} ", - new Object[] { lockId, watchedNode, event.getPath() }); - return; - } - handleNodeDelete(epoch, event); - } else { - LOG.warn("Unexpected ZK event: {}", event.getType().name()); - } - } - - } -}