http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java index a8eb482..77151df 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java @@ -19,28 +19,24 @@ package org.apache.distributedlog.lock; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; -import com.twitter.concurrent.AsyncSemaphore; +import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; import org.apache.distributedlog.exceptions.LockingException; import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException; import org.apache.distributedlog.exceptions.UnexpectedException; -import org.apache.distributedlog.util.FutureUtils; -import org.apache.distributedlog.util.FutureUtils.OrderedFutureEventListener; +import org.apache.distributedlog.common.concurrent.AsyncSemaphore; +import org.apache.distributedlog.common.concurrent.FutureEventListener; +import org.apache.distributedlog.common.concurrent.FutureUtils; import org.apache.distributedlog.util.OrderedScheduler; -import com.twitter.util.Function; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction0; -import scala.runtime.BoxedUnit; - -import java.io.IOException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; /** * Distributed lock, using ZooKeeper. @@ -78,22 +74,22 @@ public class ZKDistributedLock implements LockListener, DistributedLock { private final long lockTimeout; private final DistributedLockContext lockContext = new DistributedLockContext(); - private final AsyncSemaphore lockSemaphore = new AsyncSemaphore(1); + private final AsyncSemaphore lockSemaphore = new AsyncSemaphore(1, Optional.empty()); // We have two lock acquire futures: // 1. lock acquire future: for the initial acquire op // 2. lock reacquire future: for reacquire necessary when session expires, lock is closed - private Future<ZKDistributedLock> lockAcquireFuture = null; - private Future<ZKDistributedLock> lockReacquireFuture = null; + private CompletableFuture<ZKDistributedLock> lockAcquireFuture = null; + private CompletableFuture<ZKDistributedLock> lockReacquireFuture = null; // following variable tracking the status of acquire process // => create (internalLock) => tryLock (tryLockFuture) => waitForAcquire (lockWaiter) private SessionLock internalLock = null; - private Future<LockWaiter> tryLockFuture = null; + private CompletableFuture<LockWaiter> tryLockFuture = null; private LockWaiter lockWaiter = null; // exception indicating if the reacquire failed private LockingException lockReacquireException = null; // closeFuture private volatile boolean closed = false; - private Future<Void> closeFuture = null; + private CompletableFuture<Void> closeFuture = null; // A counter to track how many re-acquires happened during a lock's life cycle. private final AtomicInteger reacquireCount = new AtomicInteger(0); @@ -136,25 +132,19 @@ public class ZKDistributedLock implements LockListener, DistributedLock { * Asynchronously acquire the lock. Technically the try phase of this operation--which adds us to the waiter * list--is executed synchronously, but the lock wait itself doesn't block. */ - public synchronized Future<ZKDistributedLock> asyncAcquire() { + public synchronized CompletableFuture<ZKDistributedLock> asyncAcquire() { if (null != lockAcquireFuture) { - return Future.exception(new UnexpectedException("Someone is already acquiring/acquired lock " + lockPath)); + return FutureUtils.exception(new UnexpectedException("Someone is already acquiring/acquired lock " + lockPath)); } - final Promise<ZKDistributedLock> promise = - new Promise<ZKDistributedLock>(new Function<Throwable, BoxedUnit>() { - @Override - public BoxedUnit apply(Throwable cause) { - lockStateExecutor.submit(lockPath, new Runnable() { - @Override - public void run() { - asyncClose(); - } - }); - return BoxedUnit.UNIT; + final CompletableFuture<ZKDistributedLock> promise = FutureUtils.createFuture(); + promise.whenComplete((zkDistributedLock, throwable) -> { + if (null == throwable || !(throwable instanceof CancellationException)) { + return; } + lockStateExecutor.submit(lockPath, () -> asyncClose()); }); final Stopwatch stopwatch = Stopwatch.createStarted(); - promise.addEventListener(new FutureEventListener<ZKDistributedLock>() { + promise.whenComplete(new FutureEventListener<ZKDistributedLock>() { @Override public void onSuccess(ZKDistributedLock lock) { acquireStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); @@ -176,41 +166,39 @@ public class ZKDistributedLock implements LockListener, DistributedLock { return promise; } - void doAsyncAcquireWithSemaphore(final Promise<ZKDistributedLock> acquirePromise, + void doAsyncAcquireWithSemaphore(final CompletableFuture<ZKDistributedLock> acquirePromise, final long lockTimeout) { - lockSemaphore.acquireAndRun(new AbstractFunction0<Future<ZKDistributedLock>>() { - @Override - public Future<ZKDistributedLock> apply() { - doAsyncAcquire(acquirePromise, lockTimeout); - return acquirePromise; - } + lockSemaphore.acquireAndRun(() -> { + doAsyncAcquire(acquirePromise, lockTimeout); + return acquirePromise; }); } - void doAsyncAcquire(final Promise<ZKDistributedLock> acquirePromise, + void doAsyncAcquire(final CompletableFuture<ZKDistributedLock> acquirePromise, final long lockTimeout) { LOG.trace("Async Lock Acquire {}", lockPath); try { checkLockState(); } catch (IOException ioe) { - FutureUtils.setException(acquirePromise, ioe); + FutureUtils.completeExceptionally(acquirePromise, ioe); return; } if (haveLock()) { // it already hold the lock - FutureUtils.setValue(acquirePromise, this); + FutureUtils.complete(acquirePromise, this); return; } - lockFactory.createLock(lockPath, lockContext).addEventListener(OrderedFutureEventListener.of( - new FutureEventListener<SessionLock>() { + lockFactory + .createLock(lockPath, lockContext) + .whenCompleteAsync(new FutureEventListener<SessionLock>() { @Override public void onSuccess(SessionLock lock) { synchronized (ZKDistributedLock.this) { if (closed) { LOG.info("Skipping tryLocking lock {} since it is already closed", lockPath); - FutureUtils.setException(acquirePromise, newLockClosedException()); + FutureUtils.completeExceptionally(acquirePromise, newLockClosedException()); return; } } @@ -223,62 +211,64 @@ public class ZKDistributedLock implements LockListener, DistributedLock { @Override public void onFailure(Throwable cause) { - FutureUtils.setException(acquirePromise, cause); + FutureUtils.completeExceptionally(acquirePromise, cause); } - }, lockStateExecutor, lockPath)); + }, lockStateExecutor.chooseExecutor(lockPath)); } void asyncTryLock(SessionLock lock, - final Promise<ZKDistributedLock> acquirePromise, + final CompletableFuture<ZKDistributedLock> acquirePromise, final long lockTimeout) { if (null != tryLockFuture) { - tryLockFuture.cancel(); + tryLockFuture.cancel(true); } tryLockFuture = lock.asyncTryLock(lockTimeout, TimeUnit.MILLISECONDS); - tryLockFuture.addEventListener(OrderedFutureEventListener.of( - new FutureEventListener<LockWaiter>() { - @Override - public void onSuccess(LockWaiter waiter) { - synchronized (ZKDistributedLock.this) { - if (closed) { - LOG.info("Skipping acquiring lock {} since it is already closed", lockPath); - waiter.getAcquireFuture().raise(new LockingException(lockPath, "lock is already closed.")); - FutureUtils.setException(acquirePromise, newLockClosedException()); - return; - } + tryLockFuture.whenCompleteAsync( + new FutureEventListener<LockWaiter>() { + @Override + public void onSuccess(LockWaiter waiter) { + synchronized (ZKDistributedLock.this) { + if (closed) { + LOG.info("Skipping acquiring lock {} since it is already closed", lockPath); + waiter + .getAcquireFuture() + .completeExceptionally(new LockingException(lockPath, "lock is already closed.")); + FutureUtils.completeExceptionally(acquirePromise, newLockClosedException()); + return; } - tryLockFuture = null; - lockWaiter = waiter; - waitForAcquire(waiter, acquirePromise); } + tryLockFuture = null; + lockWaiter = waiter; + waitForAcquire(waiter, acquirePromise); + } - @Override - public void onFailure(Throwable cause) { - FutureUtils.setException(acquirePromise, cause); - } - }, lockStateExecutor, lockPath)); + @Override + public void onFailure(Throwable cause) { + FutureUtils.completeExceptionally(acquirePromise, cause); + } + }, lockStateExecutor.chooseExecutor(lockPath)); } void waitForAcquire(final LockWaiter waiter, - final Promise<ZKDistributedLock> acquirePromise) { - waiter.getAcquireFuture().addEventListener(OrderedFutureEventListener.of( - new FutureEventListener<Boolean>() { - @Override - public void onSuccess(Boolean acquired) { - LOG.info("{} acquired lock {}", waiter, lockPath); - if (acquired) { - FutureUtils.setValue(acquirePromise, ZKDistributedLock.this); - } else { - FutureUtils.setException(acquirePromise, - new OwnershipAcquireFailedException(lockPath, waiter.getCurrentOwner())); - } + final CompletableFuture<ZKDistributedLock> acquirePromise) { + waiter.getAcquireFuture().whenCompleteAsync( + new FutureEventListener<Boolean>() { + @Override + public void onSuccess(Boolean acquired) { + LOG.info("{} acquired lock {}", waiter, lockPath); + if (acquired) { + FutureUtils.complete(acquirePromise, ZKDistributedLock.this); + } else { + FutureUtils.completeExceptionally(acquirePromise, + new OwnershipAcquireFailedException(lockPath, waiter.getCurrentOwner())); } + } - @Override - public void onFailure(Throwable cause) { - FutureUtils.setException(acquirePromise, cause); - } - }, lockStateExecutor, lockPath)); + @Override + public void onFailure(Throwable cause) { + FutureUtils.completeExceptionally(acquirePromise, cause); + } + }, lockStateExecutor.chooseExecutor(lockPath)); } /** @@ -300,7 +290,7 @@ public class ZKDistributedLock implements LockListener, DistributedLock { * @throws LockingException if the lock attempt fails */ public synchronized void checkOwnershipAndReacquire() throws LockingException { - if (null == lockAcquireFuture || !lockAcquireFuture.isDefined()) { + if (null == lockAcquireFuture || !lockAcquireFuture.isDone()) { throw new LockingException(lockPath, "check ownership before acquiring"); } @@ -322,7 +312,7 @@ public class ZKDistributedLock implements LockListener, DistributedLock { * @throws LockingException if the lock attempt fails */ public synchronized void checkOwnership() throws LockingException { - if (null == lockAcquireFuture || !lockAcquireFuture.isDefined()) { + if (null == lockAcquireFuture || !lockAcquireFuture.isDone()) { throw new LockingException(lockPath, "check ownership before acquiring"); } if (!haveLock()) { @@ -336,12 +326,12 @@ public class ZKDistributedLock implements LockListener, DistributedLock { } @VisibleForTesting - synchronized Future<ZKDistributedLock> getLockReacquireFuture() { + synchronized CompletableFuture<ZKDistributedLock> getLockReacquireFuture() { return lockReacquireFuture; } @VisibleForTesting - synchronized Future<ZKDistributedLock> getLockAcquireFuture() { + synchronized CompletableFuture<ZKDistributedLock> getLockAcquireFuture() { return lockAcquireFuture; } @@ -360,71 +350,65 @@ public class ZKDistributedLock implements LockListener, DistributedLock { } void closeWaiter(final LockWaiter waiter, - final Promise<Void> closePromise) { + final CompletableFuture<Void> closePromise) { if (null == waiter) { interruptTryLock(tryLockFuture, closePromise); } else { - waiter.getAcquireFuture().addEventListener(OrderedFutureEventListener.of( - new FutureEventListener<Boolean>() { - @Override - public void onSuccess(Boolean value) { - unlockInternalLock(closePromise); - } - @Override - public void onFailure(Throwable cause) { - unlockInternalLock(closePromise); - } - }, lockStateExecutor, lockPath)); - FutureUtils.cancel(waiter.getAcquireFuture()); + waiter.getAcquireFuture().whenCompleteAsync( + new FutureEventListener<Boolean>() { + @Override + public void onSuccess(Boolean value) { + unlockInternalLock(closePromise); + } + @Override + public void onFailure(Throwable cause) { + unlockInternalLock(closePromise); + } + }, lockStateExecutor.chooseExecutor(lockPath)); + waiter.getAcquireFuture().cancel(true); } } - void interruptTryLock(final Future<LockWaiter> tryLockFuture, - final Promise<Void> closePromise) { + void interruptTryLock(final CompletableFuture<LockWaiter> tryLockFuture, + final CompletableFuture<Void> closePromise) { if (null == tryLockFuture) { unlockInternalLock(closePromise); } else { - tryLockFuture.addEventListener(OrderedFutureEventListener.of( - new FutureEventListener<LockWaiter>() { - @Override - public void onSuccess(LockWaiter waiter) { - closeWaiter(waiter, closePromise); - } - @Override - public void onFailure(Throwable cause) { - unlockInternalLock(closePromise); - } - }, lockStateExecutor, lockPath)); - FutureUtils.cancel(tryLockFuture); + tryLockFuture.whenCompleteAsync( + new FutureEventListener<LockWaiter>() { + @Override + public void onSuccess(LockWaiter waiter) { + closeWaiter(waiter, closePromise); + } + @Override + public void onFailure(Throwable cause) { + unlockInternalLock(closePromise); + } + }, lockStateExecutor.chooseExecutor(lockPath)); + tryLockFuture.cancel(true); } } - synchronized void unlockInternalLock(final Promise<Void> closePromise) { + synchronized void unlockInternalLock(final CompletableFuture<Void> closePromise) { if (internalLock == null) { - FutureUtils.setValue(closePromise, null); + FutureUtils.complete(closePromise, null); } else { - internalLock.asyncUnlock().ensure(new AbstractFunction0<BoxedUnit>() { - @Override - public BoxedUnit apply() { - FutureUtils.setValue(closePromise, null); - return BoxedUnit.UNIT; - } - }); + internalLock.asyncUnlock().whenComplete((value, cause) -> closePromise.complete(null)); } } @Override - public Future<Void> asyncClose() { - final Promise<Void> closePromise; + public CompletableFuture<Void> asyncClose() { + final CompletableFuture<Void> closePromise; synchronized (this) { if (closed) { return closeFuture; } closed = true; - closeFuture = closePromise = new Promise<Void>(); + closeFuture = closePromise = new CompletableFuture<Void>(); } - final Promise<Void> closeWaiterFuture = new Promise<Void>(); - closeWaiterFuture.addEventListener(OrderedFutureEventListener.of(new FutureEventListener<Void>() { + final CompletableFuture<Void> closeWaiterFuture = new CompletableFuture<Void>(); + closeWaiterFuture.whenCompleteAsync(new FutureEventListener<Void>() { @Override public void onSuccess(Void value) { complete(); @@ -435,9 +419,9 @@ public class ZKDistributedLock implements LockListener, DistributedLock { } private void complete() { - FutureUtils.setValue(closePromise, null); + FutureUtils.complete(closePromise, null); } - }, lockStateExecutor, lockPath)); + }, lockStateExecutor.chooseExecutor(lockPath)); lockStateExecutor.submit(lockPath, new Runnable() { @Override public void run() { @@ -449,7 +433,7 @@ public class ZKDistributedLock implements LockListener, DistributedLock { void internalReacquireLock(final AtomicInteger numRetries, final long lockTimeout, - final Promise<ZKDistributedLock> reacquirePromise) { + final CompletableFuture<ZKDistributedLock> reacquirePromise) { lockStateExecutor.submit(lockPath, new Runnable() { @Override public void run() { @@ -460,25 +444,25 @@ public class ZKDistributedLock implements LockListener, DistributedLock { void doInternalReacquireLock(final AtomicInteger numRetries, final long lockTimeout, - final Promise<ZKDistributedLock> reacquirePromise) { + final CompletableFuture<ZKDistributedLock> reacquirePromise) { internalTryRetries.inc(); - Promise<ZKDistributedLock> tryPromise = new Promise<ZKDistributedLock>(); - tryPromise.addEventListener(new FutureEventListener<ZKDistributedLock>() { + CompletableFuture<ZKDistributedLock> tryPromise = new CompletableFuture<ZKDistributedLock>(); + tryPromise.whenComplete(new FutureEventListener<ZKDistributedLock>() { @Override public void onSuccess(ZKDistributedLock lock) { - FutureUtils.setValue(reacquirePromise, lock); + FutureUtils.complete(reacquirePromise, lock); } @Override public void onFailure(Throwable cause) { if (cause instanceof OwnershipAcquireFailedException) { // the lock has been acquired by others - FutureUtils.setException(reacquirePromise, cause); + FutureUtils.completeExceptionally(reacquirePromise, cause); } else { if (numRetries.getAndDecrement() > 0 && !closed) { internalReacquireLock(numRetries, lockTimeout, reacquirePromise); } else { - FutureUtils.setException(reacquirePromise, cause); + FutureUtils.completeExceptionally(reacquirePromise, cause); } } } @@ -486,9 +470,9 @@ public class ZKDistributedLock implements LockListener, DistributedLock { doAsyncAcquireWithSemaphore(tryPromise, 0); } - private Future<ZKDistributedLock> reacquireLock(boolean throwLockAcquireException) throws LockingException { + private CompletableFuture<ZKDistributedLock> reacquireLock(boolean throwLockAcquireException) throws LockingException { final Stopwatch stopwatch = Stopwatch.createStarted(); - Promise<ZKDistributedLock> lockPromise; + CompletableFuture<ZKDistributedLock> lockPromise; synchronized (this) { if (closed) { throw newLockClosedException(); @@ -504,8 +488,8 @@ public class ZKDistributedLock implements LockListener, DistributedLock { return lockReacquireFuture; } LOG.info("reacquiring lock at {}", lockPath); - lockReacquireFuture = lockPromise = new Promise<ZKDistributedLock>(); - lockReacquireFuture.addEventListener(new FutureEventListener<ZKDistributedLock>() { + lockReacquireFuture = lockPromise = new CompletableFuture<ZKDistributedLock>(); + lockReacquireFuture.whenComplete(new FutureEventListener<ZKDistributedLock>() { @Override public void onSuccess(ZKDistributedLock lock) { // if re-acquire successfully, clear the state.
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java index e2699e7..9fdcbf1 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java @@ -17,8 +17,22 @@ */ package org.apache.distributedlog.lock; +import static com.google.common.base.Charsets.UTF_8; + import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.net.URLEncoder; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import org.apache.distributedlog.DistributedLogConstants; import org.apache.distributedlog.util.FailpointUtils; import org.apache.distributedlog.exceptions.LockingException; @@ -27,18 +41,10 @@ import org.apache.distributedlog.exceptions.DLInterruptedException; import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException; import org.apache.distributedlog.exceptions.UnexpectedException; import org.apache.distributedlog.exceptions.ZKException; -import org.apache.distributedlog.stats.OpStatsListener; -import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.common.stats.OpStatsListener; +import org.apache.distributedlog.common.concurrent.FutureEventListener; +import org.apache.distributedlog.common.concurrent.FutureUtils; import org.apache.distributedlog.util.OrderedScheduler; -import com.twitter.util.Await; -import com.twitter.util.Duration; -import com.twitter.util.Function0; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; -import com.twitter.util.Return; -import com.twitter.util.Throw; -import com.twitter.util.TimeoutException; import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.OpStatsLogger; @@ -54,20 +60,6 @@ import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction1; -import scala.runtime.BoxedUnit; - -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.net.URLDecoder; -import java.net.URLEncoder; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import static com.google.common.base.Charsets.UTF_8; /** * A lock under a given zookeeper session. This is a one-time lock. @@ -276,7 +268,7 @@ class ZKSessionLock implements SessionLock { private StateManagement lockState; private final DistributedLockContext lockContext; - private final Promise<Boolean> acquireFuture; + private final CompletableFuture<Boolean> acquireFuture; private String currentId; private String currentNode; private String watchedNode; @@ -342,15 +334,14 @@ class ZKSessionLock implements SessionLock { this.unlockStats = statsLogger.getOpStatsLogger("unlock"); // Attach interrupt handler to acquire future so clients can abort the future. - this.acquireFuture = new Promise<Boolean>(new com.twitter.util.Function<Throwable, BoxedUnit>() { - @Override - public BoxedUnit apply(Throwable t) { + this.acquireFuture = FutureUtils.createFuture(); + this.acquireFuture.whenComplete((value, cause) -> { + if (null != cause) { // This will set the lock state to closed, and begin to cleanup the zk lock node. // We have to be careful not to block here since doing so blocks the ordered lock // state executor which can cause deadlocks depending on how futures are chained. - ZKSessionLock.this.asyncUnlock(t); + ZKSessionLock.this.asyncUnlock(cause); // Note re. logging and exceptions: errors are already logged by unlockAsync. - return BoxedUnit.UNIT; } }); } @@ -433,7 +424,7 @@ class ZKSessionLock implements SessionLock { * @param promise * promise */ - protected <T> void executeLockAction(final int lockEpoch, final LockAction func, final Promise<T> promise) { + protected <T> void executeLockAction(final int lockEpoch, final LockAction func, final CompletableFuture<T> promise) { lockStateExecutor.submit(lockPath, new SafeRunnable() { @Override public void safeRun() { @@ -453,7 +444,7 @@ class ZKSessionLock implements SessionLock { LOG.trace("{} skipped executing lock action '{}' for lock {}, since epoch is changed from {} to {}.", new Object[]{lockId, func.getActionName(), lockPath, lockEpoch, currentEpoch}); } - promise.setException(new EpochChangedException(lockPath, lockEpoch, currentEpoch)); + promise.completeExceptionally(new EpochChangedException(lockPath, lockEpoch, currentEpoch)); } } }); @@ -516,7 +507,7 @@ class ZKSessionLock implements SessionLock { * node name * @return client id and its ephemeral owner. */ - static Future<Pair<String, Long>> asyncParseClientID(ZooKeeper zkClient, String lockPath, String nodeName) { + static CompletableFuture<Pair<String, Long>> asyncParseClientID(ZooKeeper zkClient, String lockPath, String nodeName) { String[] parts = nodeName.split("_"); // member_<clientid>_s<owner_session>_ if (4 == parts.length && parts[2].startsWith("s")) { @@ -524,19 +515,19 @@ class ZKSessionLock implements SessionLock { String clientId; try { clientId = URLDecoder.decode(parts[1], UTF_8.name()); - return Future.value(Pair.of(clientId, sessionOwner)); + return FutureUtils.value(Pair.of(clientId, sessionOwner)); } catch (UnsupportedEncodingException e) { // if failed to parse client id, we have to get client id by zookeeper#getData. } } - final Promise<Pair<String, Long>> promise = new Promise<Pair<String, Long>>(); + final CompletableFuture<Pair<String, Long>> promise = new CompletableFuture<Pair<String, Long>>(); zkClient.getData(lockPath + "/" + nodeName, false, new AsyncCallback.DataCallback() { @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { if (KeeperException.Code.OK.intValue() != rc) { - promise.setException(KeeperException.create(KeeperException.Code.get(rc))); + promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc))); } else { - promise.setValue(Pair.of(deserializeClientId(data), stat.getEphemeralOwner())); + promise.complete(Pair.of(deserializeClientId(data), stat.getEphemeralOwner())); } } }, null); @@ -544,8 +535,8 @@ class ZKSessionLock implements SessionLock { } @Override - public Future<LockWaiter> asyncTryLock(final long timeout, final TimeUnit unit) { - final Promise<String> result = new Promise<String>(); + public CompletableFuture<LockWaiter> asyncTryLock(final long timeout, final TimeUnit unit) { + final CompletableFuture<String> result = new CompletableFuture<String>(); final boolean wait = DistributedLogConstants.LOCK_IMMEDIATE != timeout; if (wait) { asyncTryLock(wait, result); @@ -559,11 +550,11 @@ class ZKSessionLock implements SessionLock { @Override public void safeRun() { if (!lockState.inState(State.INIT)) { - result.setException(new LockStateChangedException(lockPath, lockId, State.INIT, lockState.getState())); + result.completeExceptionally(new LockStateChangedException(lockPath, lockId, State.INIT, lockState.getState())); return; } if (KeeperException.Code.OK.intValue() != rc) { - result.setException(KeeperException.create(KeeperException.Code.get(rc))); + result.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc))); return; } @@ -571,25 +562,20 @@ class ZKSessionLock implements SessionLock { Collections.sort(children, MEMBER_COMPARATOR); if (children.size() > 0) { - asyncParseClientID(zk, lockPath, children.get(0)).addEventListener( + asyncParseClientID(zk, lockPath, children.get(0)).whenCompleteAsync( new FutureEventListener<Pair<String, Long>>() { @Override public void onSuccess(Pair<String, Long> owner) { if (!checkOrClaimLockOwner(owner, result)) { - acquireFuture.updateIfEmpty(new Return<Boolean>(false)); + acquireFuture.complete(false); } } @Override public void onFailure(final Throwable cause) { - lockStateExecutor.submit(lockPath, new SafeRunnable() { - @Override - public void safeRun() { - result.setException(cause); - } - }); + result.completeExceptionally(cause); } - }); + }, lockStateExecutor.chooseExecutor(lockPath)); } else { asyncTryLock(wait, result); } @@ -599,14 +585,9 @@ class ZKSessionLock implements SessionLock { }, null); } - final Promise<Boolean> waiterAcquireFuture = new Promise<Boolean>(new com.twitter.util.Function<Throwable, BoxedUnit>() { - @Override - public BoxedUnit apply(Throwable t) { - acquireFuture.raise(t); - return BoxedUnit.UNIT; - } - }); - return result.map(new AbstractFunction1<String, LockWaiter>() { + final CompletableFuture<Boolean> waiterAcquireFuture = FutureUtils.createFuture(); + waiterAcquireFuture.whenComplete((value, cause) -> acquireFuture.completeExceptionally(cause)); + return result.thenApply(new Function<String, LockWaiter>() { @Override public LockWaiter apply(final String currentOwner) { final Exception acquireException = new OwnershipAcquireFailedException(lockPath, currentOwner); @@ -617,7 +598,7 @@ class ZKSessionLock implements SessionLock { acquireException, lockStateExecutor, lockPath - ).addEventListener(new FutureEventListener<Boolean>() { + ).whenComplete(new FutureEventListener<Boolean>() { @Override public void onSuccess(Boolean acquired) { @@ -631,17 +612,17 @@ class ZKSessionLock implements SessionLock { private void completeOrFail(final Throwable acquireCause) { if (isLockHeld()) { - waiterAcquireFuture.setValue(true); + waiterAcquireFuture.complete(true); } else { - asyncUnlock().addEventListener(new FutureEventListener<BoxedUnit>() { + asyncUnlock().whenComplete(new FutureEventListener<Void>() { @Override - public void onSuccess(BoxedUnit value) { - waiterAcquireFuture.setException(acquireCause); + public void onSuccess(Void value) { + waiterAcquireFuture.completeExceptionally(acquireCause); } @Override public void onFailure(Throwable cause) { - waiterAcquireFuture.setException(acquireCause); + waiterAcquireFuture.completeExceptionally(acquireCause); } }); } @@ -656,12 +637,12 @@ class ZKSessionLock implements SessionLock { } private boolean checkOrClaimLockOwner(final Pair<String, Long> currentOwner, - final Promise<String> result) { + final CompletableFuture<String> result) { if (lockId.compareTo(currentOwner) != 0 && !lockContext.hasLockId(currentOwner)) { lockStateExecutor.submit(lockPath, new SafeRunnable() { @Override public void safeRun() { - result.setValue(currentOwner.getLeft()); + result.complete(currentOwner.getLeft()); } }); return false; @@ -672,7 +653,7 @@ class ZKSessionLock implements SessionLock { @Override public void execute() { if (!lockState.inState(State.INIT)) { - result.setException(new LockStateChangedException(lockPath, lockId, State.INIT, lockState.getState())); + result.completeExceptionally(new LockStateChangedException(lockPath, lockId, State.INIT, lockState.getState())); return; } asyncTryLock(false, result); @@ -693,12 +674,12 @@ class ZKSessionLock implements SessionLock { * @param result * promise to satisfy with current lock owner */ - private void asyncTryLock(boolean wait, final Promise<String> result) { - final Promise<String> lockResult = new Promise<String>(); - lockResult.addEventListener(new FutureEventListener<String>() { + private void asyncTryLock(boolean wait, final CompletableFuture<String> result) { + final CompletableFuture<String> lockResult = new CompletableFuture<String>(); + lockResult.whenComplete(new FutureEventListener<String>() { @Override public void onSuccess(String currentOwner) { - result.setValue(currentOwner); + result.complete(currentOwner); } @Override @@ -707,7 +688,7 @@ class ZKSessionLock implements SessionLock { if (lockCause instanceof LockStateChangedException) { LOG.info("skipping cleanup for {} at {} after encountering lock " + "state change exception : ", new Object[] { lockId, lockPath, lockCause }); - result.setException(lockCause); + result.completeExceptionally(lockCause); return; } if (LOG.isDebugEnabled()) { @@ -716,15 +697,15 @@ class ZKSessionLock implements SessionLock { } // If we encountered any exception we should cleanup - Future<BoxedUnit> unlockResult = asyncUnlock(); - unlockResult.addEventListener(new FutureEventListener<BoxedUnit>() { + CompletableFuture<Void> unlockResult = asyncUnlock(); + unlockResult.whenComplete(new FutureEventListener<Void>() { @Override - public void onSuccess(BoxedUnit value) { - result.setException(lockCause); + public void onSuccess(Void value) { + result.completeExceptionally(lockCause); } @Override public void onFailure(Throwable cause) { - result.setException(lockCause); + result.completeExceptionally(lockCause); } }); } @@ -734,7 +715,7 @@ class ZKSessionLock implements SessionLock { /** * Try lock. If wait is true, it would wait and watch sibling to acquire lock when - * the sibling is dead. <i>acquireFuture</i> will be notified either it locked successfully + * the sibling is dead. <i>acquireCompletableFuture</i> will be notified either it locked successfully * or the lock failed. The promise will only satisfy with current lock owner. * * NOTE: the <i>promise</i> is only satisfied on <i>lockStateExecutor</i>, so any @@ -745,12 +726,12 @@ class ZKSessionLock implements SessionLock { * @param promise * promise to satisfy with current lock owner. */ - private void asyncTryLockWithoutCleanup(final boolean wait, final Promise<String> promise) { + private void asyncTryLockWithoutCleanup(final boolean wait, final CompletableFuture<String> promise) { executeLockAction(epoch.get(), new LockAction() { @Override public void execute() { if (!lockState.inState(State.INIT)) { - promise.setException(new LockStateChangedException(lockPath, lockId, State.INIT, lockState.getState())); + promise.completeExceptionally(new LockStateChangedException(lockPath, lockId, State.INIT, lockState.getState())); return; } lockState.transition(State.PREPARING); @@ -776,7 +757,7 @@ class ZKSessionLock implements SessionLock { public void execute() { if (KeeperException.Code.OK.intValue() != rc) { KeeperException ke = KeeperException.create(KeeperException.Code.get(rc)); - promise.setException(ke); + promise.completeExceptionally(ke); return; } @@ -797,14 +778,12 @@ class ZKSessionLock implements SessionLock { if (lockState.isExpiredOrClosing()) { // Delete node attempt may have come after PREPARING but before create node, in which case // we'd be left with a dangling node unless we clean up. - Promise<BoxedUnit> deletePromise = new Promise<BoxedUnit>(); + CompletableFuture<Void> deletePromise = new CompletableFuture<Void>(); deleteLockNode(deletePromise); - deletePromise.ensure(new Function0<BoxedUnit>() { - public BoxedUnit apply() { - promise.setException(new LockClosedException(lockPath, lockId, lockState.getState())); - return BoxedUnit.UNIT; - } - }); + FutureUtils.ensure( + deletePromise, + () -> promise.completeExceptionally( + new LockClosedException(lockPath, lockId, lockState.getState()))); return; } @@ -830,7 +809,7 @@ class ZKSessionLock implements SessionLock { @Override public void tryLock(long timeout, TimeUnit unit) throws LockingException { final Stopwatch stopwatch = Stopwatch.createStarted(); - Future<LockWaiter> tryFuture = asyncTryLock(timeout, unit); + CompletableFuture<LockWaiter> tryFuture = asyncTryLock(timeout, unit); LockWaiter waiter = waitForTry(stopwatch, tryFuture); boolean acquired = waiter.waitForAcquireQuietly(); if (!acquired) { @@ -838,13 +817,13 @@ class ZKSessionLock implements SessionLock { } } - synchronized LockWaiter waitForTry(Stopwatch stopwatch, Future<LockWaiter> tryFuture) + synchronized LockWaiter waitForTry(Stopwatch stopwatch, CompletableFuture<LockWaiter> tryFuture) throws LockingException { boolean success = false; boolean stateChanged = false; LockWaiter waiter; try { - waiter = Await.result(tryFuture, Duration.fromMilliseconds(lockOpTimeout)); + waiter = FutureUtils.result(tryFuture, lockOpTimeout, TimeUnit.MILLISECONDS); success = true; } catch (LockStateChangedException ex) { stateChanged = true; @@ -873,12 +852,12 @@ class ZKSessionLock implements SessionLock { } @Override - public Future<BoxedUnit> asyncUnlock() { + public CompletableFuture<Void> asyncUnlock() { return asyncUnlock(new LockClosedException(lockPath, lockId, lockState.getState())); } - Future<BoxedUnit> asyncUnlock(final Throwable cause) { - final Promise<BoxedUnit> promise = new Promise<BoxedUnit>(); + CompletableFuture<Void> asyncUnlock(final Throwable cause) { + final CompletableFuture<Void> promise = new CompletableFuture<Void>(); // Use lock executor here rather than lock action, because we want this opertaion to be applied // whether the epoch has changed or not. The member node is EPHEMERAL_SEQUENTIAL so there's no @@ -886,9 +865,9 @@ class ZKSessionLock implements SessionLock { lockStateExecutor.submit(lockPath, new SafeRunnable() { @Override public void safeRun() { - acquireFuture.updateIfEmpty(new Throw<Boolean>(cause)); + acquireFuture.completeExceptionally(cause); unlockInternal(promise); - promise.addEventListener(new OpStatsListener<BoxedUnit>(unlockStats)); + promise.whenComplete(new OpStatsListener<Void>(unlockStats)); } }); @@ -897,9 +876,9 @@ class ZKSessionLock implements SessionLock { @Override public void unlock() { - Future<BoxedUnit> unlockResult = asyncUnlock(); + CompletableFuture<Void> unlockResult = asyncUnlock(); try { - Await.result(unlockResult, Duration.fromMilliseconds(lockOpTimeout)); + FutureUtils.result(unlockResult, lockOpTimeout, TimeUnit.MILLISECONDS); } catch (TimeoutException toe) { // This shouldn't happen unless we lose a watch, and may result in a leaked lock. LOG.error("Timeout unlocking {} owned by {} : ", new Object[] { lockPath, lockId, toe }); @@ -921,13 +900,13 @@ class ZKSessionLock implements SessionLock { new Object[] { lockPath, System.currentTimeMillis(), lockEpoch, ZKSessionLock.this.epoch.get() }); } - acquireFuture.updateIfEmpty(new Return<Boolean>(true)); + acquireFuture.complete(true); } /** * NOTE: unlockInternal should only after try lock. */ - private void unlockInternal(final Promise<BoxedUnit> promise) { + private void unlockInternal(final CompletableFuture<Void> promise) { // already closed or expired, nothing to cleanup this.epoch.incrementAndGet(); @@ -936,7 +915,7 @@ class ZKSessionLock implements SessionLock { } if (lockState.inState(State.CLOSED)) { - promise.setValue(BoxedUnit.UNIT); + promise.complete(null); return; } @@ -951,39 +930,34 @@ class ZKSessionLock implements SessionLock { // Nothing to cleanup if INIT (never tried) or EXPIRED (ephemeral node // auto-removed) lockState.transition(State.CLOSED); - promise.setValue(BoxedUnit.UNIT); + promise.complete(null); return; } // In any other state, we should clean up the member node - Promise<BoxedUnit> deletePromise = new Promise<BoxedUnit>(); + CompletableFuture<Void> deletePromise = new CompletableFuture<Void>(); deleteLockNode(deletePromise); // Set the state to closed after we've cleaned up - deletePromise.addEventListener(new FutureEventListener<BoxedUnit>() { + deletePromise.whenCompleteAsync(new FutureEventListener<Void>() { @Override - public void onSuccess(BoxedUnit complete) { - lockStateExecutor.submit(lockPath, new SafeRunnable() { - @Override - public void safeRun() { - lockState.transition(State.CLOSED); - promise.setValue(BoxedUnit.UNIT); - } - }); + public void onSuccess(Void complete) { + lockState.transition(State.CLOSED); + promise.complete(null); } @Override public void onFailure(Throwable cause) { // Delete failure is quite serious (causes lock leak) and should be // handled better LOG.error("lock node delete failed {} {}", lockId, lockPath); - promise.setValue(BoxedUnit.UNIT); + promise.complete(null); } - }); + }, lockStateExecutor.chooseExecutor(lockPath)); } - private void deleteLockNode(final Promise<BoxedUnit> promise) { + private void deleteLockNode(final CompletableFuture<Void> promise) { if (null == currentNode) { - promise.setValue(BoxedUnit.UNIT); + promise.complete(null); return; } @@ -1005,7 +979,7 @@ class ZKSessionLock implements SessionLock { } FailpointUtils.checkFailPointNoThrow(FailpointUtils.FailPointName.FP_LockUnlockCleanup); - promise.setValue(BoxedUnit.UNIT); + promise.complete(null); } }); } @@ -1041,8 +1015,8 @@ class ZKSessionLock implements SessionLock { // if session expired, just notify the waiter. as the lock acquire doesn't succeed. // we don't even need to clean up the lock as the znode will disappear after session expired - acquireFuture.updateIfEmpty(new Throw<Boolean>( - new LockSessionExpiredException(lockPath, lockId, lockState.getState()))); + acquireFuture.completeExceptionally( + new LockSessionExpiredException(lockPath, lockId, lockState.getState())); // session expired, ephemeral node is gone. currentNode = null; @@ -1088,9 +1062,9 @@ class ZKSessionLock implements SessionLock { }); } - private Future<String> checkLockOwnerAndWaitIfPossible(final LockWatcher lockWatcher, + private CompletableFuture<String> checkLockOwnerAndWaitIfPossible(final LockWatcher lockWatcher, final boolean wait) { - final Promise<String> promise = new Promise<String>(); + final CompletableFuture<String> promise = new CompletableFuture<String>(); checkLockOwnerAndWaitIfPossible(lockWatcher, wait, promise); return promise; } @@ -1107,7 +1081,7 @@ class ZKSessionLock implements SessionLock { */ private void checkLockOwnerAndWaitIfPossible(final LockWatcher lockWatcher, final boolean wait, - final Promise<String> promise) { + final CompletableFuture<String> promise) { zk.getChildren(lockPath, false, new AsyncCallback.Children2Callback() { @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { @@ -1134,22 +1108,22 @@ class ZKSessionLock implements SessionLock { final boolean wait, final int getChildrenRc, final List<String> children, - final Promise<String> promise) { + final CompletableFuture<String> promise) { executeLockAction(lockWatcher.epoch, new LockAction() { @Override public void execute() { if (!lockState.inState(State.PREPARED)) { // e.g. lock closed or session expired after prepared - promise.setException(new LockStateChangedException(lockPath, lockId, State.PREPARED, lockState.getState())); + promise.completeExceptionally(new LockStateChangedException(lockPath, lockId, State.PREPARED, lockState.getState())); return; } if (KeeperException.Code.OK.intValue() != getChildrenRc) { - promise.setException(KeeperException.create(KeeperException.Code.get(getChildrenRc))); + promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(getChildrenRc))); return; } if (children.isEmpty()) { LOG.error("Error, member list is empty for lock {}.", lockPath); - promise.setException(new UnexpectedException("Empty member list for lock " + lockPath)); + promise.completeExceptionally(new UnexpectedException("Empty member list for lock " + lockPath)); return; } @@ -1164,10 +1138,10 @@ class ZKSessionLock implements SessionLock { if (memberIndex == 0) { LOG.info("{} acquired the lock {}.", cid, lockPath); claimOwnership(lockWatcher.epoch); - promise.setValue(cid); + promise.complete(cid); } else if (memberIndex > 0) { // we are in the member list but we didn't hold the lock // get ownership of current owner - asyncParseClientID(zk, lockPath, children.get(0)).addEventListener(new FutureEventListener<Pair<String, Long>>() { + asyncParseClientID(zk, lockPath, children.get(0)).whenComplete(new FutureEventListener<Pair<String, Long>>() { @Override public void onSuccess(Pair<String, Long> currentOwner) { watchLockOwner(lockWatcher, wait, @@ -1179,7 +1153,7 @@ class ZKSessionLock implements SessionLock { executeLockAction(lockWatcher.epoch, new LockAction() { @Override public void execute() { - promise.setException(cause); + promise.completeExceptionally(cause); } @Override @@ -1192,7 +1166,7 @@ class ZKSessionLock implements SessionLock { } else { LOG.error("Member {} doesn't exist in the members list {} for lock {}.", new Object[]{ cid, children, lockPath}); - promise.setException( + promise.completeExceptionally( new UnexpectedException("Member " + cid + " doesn't exist in member list " + children + " for lock " + lockPath)); } @@ -1229,7 +1203,7 @@ class ZKSessionLock implements SessionLock { final String siblingNode, final String ownerNode, final Pair<String, Long> currentOwner, - final Promise<String> promise) { + final CompletableFuture<String> promise) { executeLockAction(lockWatcher.epoch, new LockAction() { @Override public void execute() { @@ -1270,7 +1244,7 @@ class ZKSessionLock implements SessionLock { @Override public void execute() { if (!lockState.inState(State.PREPARED)) { - promise.setException(new LockStateChangedException(lockPath, lockId, State.PREPARED, lockState.getState())); + promise.completeExceptionally(new LockStateChangedException(lockPath, lockId, State.PREPARED, lockState.getState())); return; } @@ -1280,17 +1254,17 @@ class ZKSessionLock implements SessionLock { LOG.info("LockWatcher {} claimed ownership for {} after set watcher on {}.", new Object[]{ myNode, lockPath, ownerNode }); claimOwnership(lockWatcher.epoch); - promise.setValue(currentOwner.getLeft()); + promise.complete(currentOwner.getLeft()); } else { // watch sibling successfully lockState.transition(State.WAITING); - promise.setValue(currentOwner.getLeft()); + promise.complete(currentOwner.getLeft()); } } else if (KeeperException.Code.NONODE.intValue() == rc) { // sibling just disappeared, it might be the chance to claim ownership checkLockOwnerAndWaitIfPossible(lockWatcher, wait, promise); } else { - promise.setException(KeeperException.create(KeeperException.Code.get(rc))); + promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc))); } } @@ -1305,7 +1279,7 @@ class ZKSessionLock implements SessionLock { } }, null); } else { - promise.setValue(currentOwner.getLeft()); + promise.complete(currentOwner.getLeft()); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKSessionLockFactory.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKSessionLockFactory.java b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKSessionLockFactory.java index 3cb25f0..223a3a4 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKSessionLockFactory.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKSessionLockFactory.java @@ -17,20 +17,17 @@ */ package org.apache.distributedlog.lock; -import org.apache.distributedlog.ZooKeeperClient; -import org.apache.distributedlog.exceptions.DLInterruptedException; -import org.apache.distributedlog.util.OrderedScheduler; -import com.twitter.util.Future; -import com.twitter.util.Promise; -import com.twitter.util.Return; -import com.twitter.util.Throw; -import org.apache.bookkeeper.stats.StatsLogger; -import scala.runtime.BoxedUnit; - import java.io.IOException; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import org.apache.distributedlog.ZooKeeperClient; +import org.apache.distributedlog.exceptions.DLInterruptedException; +import org.apache.distributedlog.common.concurrent.FutureUtils; +import org.apache.distributedlog.util.OrderedScheduler; +import org.apache.bookkeeper.stats.StatsLogger; /** * Factory to create zookeeper based locks. @@ -65,16 +62,14 @@ public class ZKSessionLockFactory implements SessionLockFactory { } @Override - public Future<SessionLock> createLock(String lockPath, - DistributedLockContext context) { + public CompletableFuture<SessionLock> createLock(String lockPath, + DistributedLockContext context) { AtomicInteger numRetries = new AtomicInteger(lockCreationRetries); final AtomicReference<Throwable> interruptedException = new AtomicReference<Throwable>(null); - Promise<SessionLock> createPromise = - new Promise<SessionLock>(new com.twitter.util.Function<Throwable, BoxedUnit>() { - @Override - public BoxedUnit apply(Throwable t) { - interruptedException.set(t); - return BoxedUnit.UNIT; + CompletableFuture<SessionLock> createPromise = FutureUtils.createFuture(); + createPromise.whenComplete((value, cause) -> { + if (null != cause && cause instanceof CancellationException) { + interruptedException.set(cause); } }); createLock( @@ -91,13 +86,13 @@ public class ZKSessionLockFactory implements SessionLockFactory { final DistributedLockContext context, final AtomicReference<Throwable> interruptedException, final AtomicInteger numRetries, - final Promise<SessionLock> createPromise, + final CompletableFuture<SessionLock> createPromise, final long delayMs) { lockStateExecutor.schedule(lockPath, new Runnable() { @Override public void run() { if (null != interruptedException.get()) { - createPromise.updateIfEmpty(new Throw<SessionLock>(interruptedException.get())); + createPromise.completeExceptionally(interruptedException.get()); return; } try { @@ -109,14 +104,14 @@ public class ZKSessionLockFactory implements SessionLockFactory { lockOpTimeout, lockStatsLogger, context); - createPromise.updateIfEmpty(new Return<SessionLock>(lock)); + createPromise.complete(lock); } catch (DLInterruptedException dlie) { // if the creation is interrupted, throw the exception without retrie. - createPromise.updateIfEmpty(new Throw<SessionLock>(dlie)); + createPromise.completeExceptionally(dlie); return; } catch (IOException e) { if (numRetries.getAndDecrement() < 0) { - createPromise.updateIfEmpty(new Throw<SessionLock>(e)); + createPromise.completeExceptionally(e); return; } createLock( http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryReader.java index 8440509..1b292e3 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryReader.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryReader.java @@ -18,12 +18,11 @@ package org.apache.distributedlog.logsegment; import com.google.common.annotations.Beta; +import java.util.List; +import java.util.concurrent.CompletableFuture; import org.apache.distributedlog.Entry; import org.apache.distributedlog.LogSegmentMetadata; import org.apache.distributedlog.io.AsyncCloseable; -import com.twitter.util.Future; - -import java.util.List; /** * An interface class to read the enveloped entry (serialized bytes of @@ -87,7 +86,7 @@ public interface LogSegmentEntryReader extends AsyncCloseable { * @throws {@link org.apache.distributedlog.exceptions.EndOfLogSegmentException} when * read entries beyond the end of a <i>closed</i> log segment. */ - Future<List<Entry.Reader>> readNext(int numEntries); + CompletableFuture<List<Entry.Reader>> readNext(int numEntries); /** * Return the last add confirmed entry id (LAC). http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryStore.java index 40be67b..ab2d898 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryStore.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryStore.java @@ -18,12 +18,11 @@ package org.apache.distributedlog.logsegment; import com.google.common.annotations.Beta; +import java.util.concurrent.CompletableFuture; import org.apache.distributedlog.LogSegmentMetadata; import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; import org.apache.distributedlog.metadata.LogMetadataForWriter; import org.apache.distributedlog.util.Allocator; -import org.apache.distributedlog.util.Transaction; -import com.twitter.util.Future; import java.io.IOException; @@ -39,7 +38,7 @@ public interface LogSegmentEntryStore { * @param segment log segment metadata * @return future represent the delete result */ - Future<LogSegmentMetadata> deleteLogSegment(LogSegmentMetadata segment); + CompletableFuture<LogSegmentMetadata> deleteLogSegment(LogSegmentMetadata segment); /** * Create a new log segment allocator for allocating log segment entry writers. @@ -58,7 +57,7 @@ public interface LogSegmentEntryStore { * @param startEntryId the start entry id * @return future represent the opened reader */ - Future<LogSegmentEntryReader> openReader(LogSegmentMetadata segment, + CompletableFuture<LogSegmentEntryReader> openReader(LogSegmentMetadata segment, long startEntryId); /** @@ -68,6 +67,6 @@ public interface LogSegmentEntryStore { * @param fence the flag to fence log segment * @return future represent the opened random access reader */ - Future<LogSegmentRandomAccessEntryReader> openRandomAccessReader(LogSegmentMetadata segment, + CompletableFuture<LogSegmentRandomAccessEntryReader> openRandomAccessReader(LogSegmentMetadata segment, boolean fence); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryWriter.java index 660592e..70f0da0 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryWriter.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryWriter.java @@ -19,7 +19,7 @@ package org.apache.distributedlog.logsegment; import com.google.common.annotations.Beta; import org.apache.distributedlog.Entry; -import org.apache.distributedlog.util.Sizable; +import org.apache.distributedlog.common.util.Sizable; import org.apache.bookkeeper.client.AsyncCallback; /** http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentMetadataStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentMetadataStore.java index fdf72b1..a58264c 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentMetadataStore.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentMetadataStore.java @@ -18,15 +18,15 @@ package org.apache.distributedlog.logsegment; import com.google.common.annotations.Beta; +import java.util.concurrent.CompletableFuture; +import org.apache.bookkeeper.versioning.Version; +import org.apache.bookkeeper.versioning.Versioned; import org.apache.distributedlog.LogSegmentMetadata; import org.apache.distributedlog.callback.LogSegmentNamesListener; import org.apache.distributedlog.metadata.LogMetadata; import org.apache.distributedlog.metadata.LogMetadataForWriter; import org.apache.distributedlog.util.Transaction; import org.apache.distributedlog.util.Transaction.OpListener; -import com.twitter.util.Future; -import org.apache.bookkeeper.versioning.Version; -import org.apache.bookkeeper.versioning.Versioned; import java.io.Closeable; import java.util.List; @@ -135,7 +135,7 @@ public interface LogSegmentMetadataStore extends Closeable { * path to store log segment metadata * @return future of the retrieved log segment metadata */ - Future<LogSegmentMetadata> getLogSegment(String logSegmentPath); + CompletableFuture<LogSegmentMetadata> getLogSegment(String logSegmentPath); /** * Retrieve the list of log segments under <code>logSegmentsPath</code> and register a <i>listener</i> @@ -147,8 +147,8 @@ public interface LogSegmentMetadataStore extends Closeable { * log segment listener on log segment changes * @return future of the retrieved list of log segment names */ - Future<Versioned<List<String>>> getLogSegmentNames(String logSegmentsPath, - LogSegmentNamesListener listener); + CompletableFuture<Versioned<List<String>>> getLogSegmentNames(String logSegmentsPath, + LogSegmentNamesListener listener); /** * Unregister a log segment <code>listener</code> on log segment changes under <code>logSegmentsPath</code>. http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentRandomAccessEntryReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentRandomAccessEntryReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentRandomAccessEntryReader.java index 948ce30..23c784e 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentRandomAccessEntryReader.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentRandomAccessEntryReader.java @@ -17,11 +17,10 @@ */ package org.apache.distributedlog.logsegment; +import java.util.List; +import java.util.concurrent.CompletableFuture; import org.apache.distributedlog.Entry; import org.apache.distributedlog.io.AsyncCloseable; -import com.twitter.util.Future; - -import java.util.List; /** * An interface class to read entries {@link org.apache.distributedlog.Entry} @@ -36,7 +35,7 @@ public interface LogSegmentRandomAccessEntryReader extends AsyncCloseable { * @param endEntryId end entry id * @return A promise that when satisfied will contain a list of entries of [startEntryId, endEntryId]. */ - Future<List<Entry.Reader>> readEntries(long startEntryId, long endEntryId); + CompletableFuture<List<Entry.Reader>> readEntries(long startEntryId, long endEntryId); /** * Return the last add confirmed entry id (LAC). http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentWriter.java index 39c94f4..c483403 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentWriter.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentWriter.java @@ -18,15 +18,14 @@ package org.apache.distributedlog.logsegment; import com.google.common.annotations.Beta; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; import org.apache.distributedlog.DLSN; import org.apache.distributedlog.LogRecord; import org.apache.distributedlog.exceptions.BKTransmitException; import org.apache.distributedlog.exceptions.LockingException; import org.apache.distributedlog.io.AsyncAbortable; import org.apache.distributedlog.io.AsyncCloseable; -import com.twitter.util.Future; - -import java.io.IOException; /** * An interface class to write log records into a log segment. @@ -53,7 +52,7 @@ public interface LogSegmentWriter extends AsyncCloseable, AsyncAbortable { * @throws BKTransmitException if failed to transmit data to bk * @throws org.apache.distributedlog.exceptions.WriteException if failed to write to bk */ - public Future<DLSN> asyncWrite(LogRecord record); + public CompletableFuture<DLSN> asyncWrite(LogRecord record); /** * This isn't a simple synchronous version of {@code asyncWrite}. It has different semantic. @@ -74,7 +73,7 @@ public interface LogSegmentWriter extends AsyncCloseable, AsyncAbortable { * * @return future representing the transmit result with last acknowledged transaction id. */ - public Future<Long> flush(); + public CompletableFuture<Long> flush(); /** * Commit the current acknowledged data. It is the consequent operation of {@link #flush()}, @@ -82,6 +81,6 @@ public interface LogSegmentWriter extends AsyncCloseable, AsyncAbortable { * * @return future representing the commit result. */ - public Future<Long> commit(); + public CompletableFuture<Long> commit(); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/RollingPolicy.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/RollingPolicy.java b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/RollingPolicy.java index ce98ac9..4844ad4 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/RollingPolicy.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/RollingPolicy.java @@ -17,7 +17,7 @@ */ package org.apache.distributedlog.logsegment; -import org.apache.distributedlog.util.Sizable; +import org.apache.distributedlog.common.util.Sizable; public interface RollingPolicy { /** http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/SizeBasedRollingPolicy.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/SizeBasedRollingPolicy.java b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/SizeBasedRollingPolicy.java index 544f58e..1c3794a 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/SizeBasedRollingPolicy.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/SizeBasedRollingPolicy.java @@ -17,7 +17,7 @@ */ package org.apache.distributedlog.logsegment; -import org.apache.distributedlog.util.Sizable; +import org.apache.distributedlog.common.util.Sizable; public class SizeBasedRollingPolicy implements RollingPolicy { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/TimeBasedRollingPolicy.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/TimeBasedRollingPolicy.java b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/TimeBasedRollingPolicy.java index 141f139..80c09be 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/TimeBasedRollingPolicy.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/TimeBasedRollingPolicy.java @@ -17,7 +17,7 @@ */ package org.apache.distributedlog.logsegment; -import org.apache.distributedlog.util.Sizable; +import org.apache.distributedlog.common.util.Sizable; import org.apache.distributedlog.util.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/DLMetadata.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/DLMetadata.java b/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/DLMetadata.java index 7339d55..948f2bf 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/DLMetadata.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/DLMetadata.java @@ -18,6 +18,7 @@ package org.apache.distributedlog.metadata; import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.exceptions.ZKException; import org.apache.distributedlog.impl.metadata.BKDLConfig; import org.apache.distributedlog.util.Utils; import org.apache.distributedlog.ZooKeeperClient; @@ -144,13 +145,9 @@ public class DLMetadata { byte[] data = serialize(); try { Utils.zkCreateFullPathOptimistic(zkc, uri.getPath(), data, - zkc.getDefaultACL(), CreateMode.PERSISTENT); - } catch (KeeperException e) { - throw new IOException("Fail to write dl metadata " + new String(data, UTF_8) - + " to uri " + uri, e); - } catch (InterruptedException e) { - throw new IOException("Interrupted when writing dl metadata " + new String(data, UTF_8) - + " to uri " + uri, e); + zkc.getDefaultACL(), CreateMode.PERSISTENT); + } catch (KeeperException ke) { + throw new ZKException("Encountered zookeeper exception on creating dl metadata", ke); } finally { zkc.close(); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/DryrunLogSegmentMetadataStoreUpdater.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/DryrunLogSegmentMetadataStoreUpdater.java b/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/DryrunLogSegmentMetadataStoreUpdater.java index 6b835b9..fe52804 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/DryrunLogSegmentMetadataStoreUpdater.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/DryrunLogSegmentMetadataStoreUpdater.java @@ -17,10 +17,11 @@ */ package org.apache.distributedlog.metadata; +import java.util.concurrent.CompletableFuture; import org.apache.distributedlog.DistributedLogConfiguration; import org.apache.distributedlog.logsegment.LogSegmentMetadataStore; +import org.apache.distributedlog.common.concurrent.FutureUtils; import org.apache.distributedlog.util.Transaction; -import com.twitter.util.Future; public class DryrunLogSegmentMetadataStoreUpdater extends LogSegmentMetadataStoreUpdater { @@ -38,8 +39,8 @@ public class DryrunLogSegmentMetadataStoreUpdater extends LogSegmentMetadataStor } @Override - public Future<Void> execute() { - return Future.Void(); + public CompletableFuture<Void> execute() { + return FutureUtils.Void(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogMetadataStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogMetadataStore.java index f8fd777..8135678 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogMetadataStore.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogMetadataStore.java @@ -19,11 +19,10 @@ package org.apache.distributedlog.metadata; import com.google.common.annotations.Beta; import com.google.common.base.Optional; -import org.apache.distributedlog.callback.NamespaceListener; -import com.twitter.util.Future; - import java.net.URI; import java.util.Iterator; +import java.util.concurrent.CompletableFuture; +import org.apache.distributedlog.callback.NamespaceListener; /** * Interface for log metadata store. @@ -38,7 +37,7 @@ public interface LogMetadataStore { * name of the log * @return namespace location that stores this stream. */ - Future<URI> createLog(String logName); + CompletableFuture<URI> createLog(String logName); /** * Get the location of the log. @@ -47,14 +46,14 @@ public interface LogMetadataStore { * name of the log * @return namespace location that stores this stream. */ - Future<Optional<URI>> getLogLocation(String logName); + CompletableFuture<Optional<URI>> getLogLocation(String logName); /** * Retrieves logs from the namespace. * * @return iterator of logs of the namespace. */ - Future<Iterator<String>> getLogs(); + CompletableFuture<Iterator<String>> getLogs(); /** * Register a namespace listener on streams changes.