http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/Utils.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/Utils.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/Utils.java index 347f041..99a4155 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/Utils.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/util/Utils.java @@ -20,8 +20,11 @@ package org.apache.distributedlog.util; import java.io.Closeable; import java.io.IOException; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; @@ -29,18 +32,18 @@ import com.google.common.base.Objects; import com.google.common.base.Optional; import com.google.common.collect.Lists; import com.google.common.io.Closeables; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.BKException; import org.apache.distributedlog.DistributedLogConstants; import org.apache.distributedlog.ZooKeeperClient; +import org.apache.distributedlog.common.concurrent.FutureUtils; +import org.apache.distributedlog.exceptions.BKTransmitException; import org.apache.distributedlog.exceptions.DLInterruptedException; -import org.apache.distributedlog.exceptions.InvalidStreamNameException; +import org.apache.distributedlog.exceptions.UnexpectedException; import org.apache.distributedlog.exceptions.ZKException; -import org.apache.distributedlog.function.VoidFunctions; +import org.apache.distributedlog.common.functions.VoidFunctions; +import org.apache.distributedlog.io.AsyncAbortable; import org.apache.distributedlog.io.AsyncCloseable; -import com.twitter.util.Await; -import com.twitter.util.Future; -import com.twitter.util.Promise; -import com.twitter.util.Return; -import com.twitter.util.Throw; import org.apache.bookkeeper.meta.ZkVersion; import org.apache.bookkeeper.versioning.Versioned; import org.apache.zookeeper.ZooKeeper; @@ -49,17 +52,13 @@ import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.runtime.BoxedUnit; /** * Basic Utilities. */ +@Slf4j public class Utils { - private static final Logger logger = LoggerFactory.getLogger(Utils.class); - /** * Current time from some arbitrary time base in the past, counting in * nanoseconds, and not affected by settimeofday or similar system clock @@ -115,16 +114,15 @@ public class Utils { String path, byte[] data, final List<ACL> acl, - final CreateMode createMode) - throws ZooKeeperClient.ZooKeeperConnectionException, KeeperException, InterruptedException { + final CreateMode createMode) throws IOException, KeeperException { try { - Await.result(zkAsyncCreateFullPathOptimistic(zkc, path, data, acl, createMode)); + FutureUtils.result(zkAsyncCreateFullPathOptimistic(zkc, path, data, acl, createMode)); } catch (ZooKeeperClient.ZooKeeperConnectionException zkce) { throw zkce; } catch (KeeperException ke) { throw ke; } catch (InterruptedException ie) { - throw ie; + throw new DLInterruptedException("Interrupted on create zookeeper path " + path, ie); } catch (RuntimeException rte) { throw rte; } catch (Exception exc) { @@ -208,7 +206,7 @@ public class Utils { * @param acl Acl of the zk path * @param createMode Create mode of zk path */ - public static Future<BoxedUnit> zkAsyncCreateFullPathOptimistic( + public static CompletableFuture<Void> zkAsyncCreateFullPathOptimistic( final ZooKeeperClient zkc, final String pathToCreate, final byte[] data, @@ -234,14 +232,14 @@ public class Utils { * @param acl Acl of the zk path * @param createMode Create mode of zk path */ - public static Future<BoxedUnit> zkAsyncCreateFullPathOptimistic( + public static CompletableFuture<Void> zkAsyncCreateFullPathOptimistic( final ZooKeeperClient zkc, final String pathToCreate, final Optional<String> parentPathShouldNotCreate, final byte[] data, final List<ACL> acl, final CreateMode createMode) { - final Promise<BoxedUnit> result = new Promise<BoxedUnit>(); + final CompletableFuture<Void> result = new CompletableFuture<Void>(); zkAsyncCreateFullPathOptimisticRecursive(zkc, pathToCreate, parentPathShouldNotCreate, data, acl, createMode, new AsyncCallback.StringCallback() { @@ -263,13 +261,13 @@ public class Utils { * @param acl Acl of the zk path * @param createMode Create mode of zk path */ - public static Future<BoxedUnit> zkAsyncCreateFullPathOptimisticAndSetData( + public static CompletableFuture<Void> zkAsyncCreateFullPathOptimisticAndSetData( final ZooKeeperClient zkc, final String pathToCreate, final byte[] data, final List<ACL> acl, final CreateMode createMode) { - final Promise<BoxedUnit> result = new Promise<BoxedUnit>(); + final CompletableFuture<Void> result = new CompletableFuture<Void>(); try { zkc.get().setData(pathToCreate, data, -1, new AsyncCallback.StatCallback() { @@ -291,32 +289,32 @@ public class Utils { } }, result); } catch (Exception exc) { - result.setException(exc); + result.completeExceptionally(exc); } return result; } - private static void handleKeeperExceptionCode(int rc, String pathOrMessage, Promise<BoxedUnit> result) { + private static void handleKeeperExceptionCode(int rc, String pathOrMessage, CompletableFuture<Void> result) { if (KeeperException.Code.OK.intValue() == rc) { - result.setValue(BoxedUnit.UNIT); + result.complete(null); } else if (DistributedLogConstants.ZK_CONNECTION_EXCEPTION_RESULT_CODE == rc) { - result.setException(new ZooKeeperClient.ZooKeeperConnectionException(pathOrMessage)); + result.completeExceptionally(new ZooKeeperClient.ZooKeeperConnectionException(pathOrMessage)); } else if (DistributedLogConstants.DL_INTERRUPTED_EXCEPTION_RESULT_CODE == rc) { - result.setException(new DLInterruptedException(pathOrMessage)); + result.completeExceptionally(new DLInterruptedException(pathOrMessage)); } else { - result.setException(KeeperException.create(KeeperException.Code.get(rc), pathOrMessage)); + result.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), pathOrMessage)); } } - public static Future<Versioned<byte[]>> zkGetData(ZooKeeperClient zkc, String path, boolean watch) { + public static CompletableFuture<Versioned<byte[]>> zkGetData(ZooKeeperClient zkc, String path, boolean watch) { ZooKeeper zk; try { zk = zkc.get(); } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - return Future.exception(FutureUtils.zkException(e, path)); + return FutureUtils.exception(zkException(e, path)); } catch (InterruptedException e) { - return Future.exception(FutureUtils.zkException(e, path)); + return FutureUtils.exception(zkException(e, path)); } return zkGetData(zk, path, watch); } @@ -330,35 +328,35 @@ public class Utils { * whether to watch the path * @return future representing the versioned value. null version or null value means path doesn't exist. */ - public static Future<Versioned<byte[]>> zkGetData(ZooKeeper zk, String path, boolean watch) { - final Promise<Versioned<byte[]>> promise = new Promise<Versioned<byte[]>>(); + public static CompletableFuture<Versioned<byte[]>> zkGetData(ZooKeeper zk, String path, boolean watch) { + final CompletableFuture<Versioned<byte[]>> promise = new CompletableFuture<Versioned<byte[]>>(); zk.getData(path, watch, new AsyncCallback.DataCallback() { @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { if (KeeperException.Code.OK.intValue() == rc) { if (null == stat) { - promise.setValue(new Versioned<byte[]>(null, null)); + promise.complete(new Versioned<byte[]>(null, null)); } else { - promise.setValue(new Versioned<byte[]>(data, new ZkVersion(stat.getVersion()))); + promise.complete(new Versioned<byte[]>(data, new ZkVersion(stat.getVersion()))); } } else if (KeeperException.Code.NONODE.intValue() == rc) { - promise.setValue(new Versioned<byte[]>(null, null)); + promise.complete(new Versioned<byte[]>(null, null)); } else { - promise.setException(KeeperException.create(KeeperException.Code.get(rc))); + promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc))); } } }, null); return promise; } - public static Future<ZkVersion> zkSetData(ZooKeeperClient zkc, String path, byte[] data, ZkVersion version) { + public static CompletableFuture<ZkVersion> zkSetData(ZooKeeperClient zkc, String path, byte[] data, ZkVersion version) { ZooKeeper zk; try { zk = zkc.get(); } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - return Future.exception(FutureUtils.zkException(e, path)); + return FutureUtils.exception(zkException(e, path)); } catch (InterruptedException e) { - return Future.exception(FutureUtils.zkException(e, path)); + return FutureUtils.exception(zkException(e, path)); } return zkSetData(zk, path, data, version); } @@ -376,31 +374,31 @@ public class Utils { * version used to set data * @return future representing the version after this operation. */ - public static Future<ZkVersion> zkSetData(ZooKeeper zk, String path, byte[] data, ZkVersion version) { - final Promise<ZkVersion> promise = new Promise<ZkVersion>(); + public static CompletableFuture<ZkVersion> zkSetData(ZooKeeper zk, String path, byte[] data, ZkVersion version) { + final CompletableFuture<ZkVersion> promise = new CompletableFuture<ZkVersion>(); zk.setData(path, data, version.getZnodeVersion(), new AsyncCallback.StatCallback() { @Override public void processResult(int rc, String path, Object ctx, Stat stat) { if (KeeperException.Code.OK.intValue() == rc) { - promise.updateIfEmpty(new Return<ZkVersion>(new ZkVersion(stat.getVersion()))); + promise.complete(new ZkVersion(stat.getVersion())); return; } - promise.updateIfEmpty(new Throw<ZkVersion>( - KeeperException.create(KeeperException.Code.get(rc)))); + promise.completeExceptionally( + KeeperException.create(KeeperException.Code.get(rc))); return; } }, null); return promise; } - public static Future<Void> zkDelete(ZooKeeperClient zkc, String path, ZkVersion version) { + public static CompletableFuture<Void> zkDelete(ZooKeeperClient zkc, String path, ZkVersion version) { ZooKeeper zk; try { zk = zkc.get(); } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - return Future.exception(FutureUtils.zkException(e, path)); + return FutureUtils.exception(zkException(e, path)); } catch (InterruptedException e) { - return Future.exception(FutureUtils.zkException(e, path)); + return FutureUtils.exception(zkException(e, path)); } return zkDelete(zk, path, version); } @@ -416,17 +414,17 @@ public class Utils { * version used to set data * @return future representing the version after this operation. */ - public static Future<Void> zkDelete(ZooKeeper zk, String path, ZkVersion version) { - final Promise<Void> promise = new Promise<Void>(); + public static CompletableFuture<Void> zkDelete(ZooKeeper zk, String path, ZkVersion version) { + final CompletableFuture<Void> promise = new CompletableFuture<Void>(); zk.delete(path, version.getZnodeVersion(), new AsyncCallback.VoidCallback() { @Override public void processResult(int rc, String path, Object ctx) { if (KeeperException.Code.OK.intValue() == rc) { - promise.updateIfEmpty(new Return<Void>(null)); + promise.complete(null); return; } - promise.updateIfEmpty(new Throw<Void>( - KeeperException.create(KeeperException.Code.get(rc)))); + promise.completeExceptionally( + KeeperException.create(KeeperException.Code.get(rc))); return; } }, null); @@ -446,35 +444,35 @@ public class Utils { * false if the node doesn't exist, otherwise future will throw exception * */ - public static Future<Boolean> zkDeleteIfNotExist(ZooKeeperClient zkc, String path, ZkVersion version) { + public static CompletableFuture<Boolean> zkDeleteIfNotExist(ZooKeeperClient zkc, String path, ZkVersion version) { ZooKeeper zk; try { zk = zkc.get(); } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - return Future.exception(FutureUtils.zkException(e, path)); + return FutureUtils.exception(zkException(e, path)); } catch (InterruptedException e) { - return Future.exception(FutureUtils.zkException(e, path)); + return FutureUtils.exception(zkException(e, path)); } - final Promise<Boolean> promise = new Promise<Boolean>(); + final CompletableFuture<Boolean> promise = new CompletableFuture<Boolean>(); zk.delete(path, version.getZnodeVersion(), new AsyncCallback.VoidCallback() { @Override public void processResult(int rc, String path, Object ctx) { if (KeeperException.Code.OK.intValue() == rc ) { - promise.setValue(true); + promise.complete(true); } else if (KeeperException.Code.NONODE.intValue() == rc) { - promise.setValue(false); + promise.complete(false); } else { - promise.setException(KeeperException.create(KeeperException.Code.get(rc))); + promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc))); } } }, null); return promise; } - public static Future<Void> asyncClose(@Nullable AsyncCloseable closeable, + public static CompletableFuture<Void> asyncClose(@Nullable AsyncCloseable closeable, boolean swallowIOException) { if (null == closeable) { - return Future.Void(); + return FutureUtils.Void(); } else if (swallowIOException) { return FutureUtils.ignore(closeable.asyncClose()); } else { @@ -548,7 +546,7 @@ public class Utils { if (null == closeable) { return; } - FutureUtils.result(closeable.asyncClose()); + Utils.ioResult(closeable.asyncClose()); } /** @@ -562,7 +560,7 @@ public class Utils { return; } try { - FutureUtils.result(closeable.asyncClose()); + Utils.ioResult(closeable.asyncClose()); } catch (IOException e) { // no-op. the exception is swallowed. } @@ -575,7 +573,7 @@ public class Utils { * closeables to close * @return future represents the close future */ - public static Future<Void> closeSequence(ExecutorService executorService, + public static CompletableFuture<Void> closeSequence(ExecutorService executorService, AsyncCloseable... closeables) { return closeSequence(executorService, false, closeables); } @@ -588,7 +586,7 @@ public class Utils { * @param closeables list of closeables * @return future represents the close future. */ - public static Future<Void> closeSequence(ExecutorService executorService, + public static CompletableFuture<Void> closeSequence(ExecutorService executorService, boolean ignoreCloseError, AsyncCloseable... closeables) { List<AsyncCloseable> closeableList = Lists.newArrayListWithExpectedSize(closeables.length); @@ -602,7 +600,8 @@ public class Utils { return FutureUtils.processList( closeableList, ignoreCloseError ? AsyncCloseable.CLOSE_FUNC_IGNORE_ERRORS : AsyncCloseable.CLOSE_FUNC, - executorService).map(VoidFunctions.LIST_TO_VOID_FUNC); + executorService + ).thenApply(VoidFunctions.LIST_TO_VOID_FUNC); } /** @@ -636,4 +635,112 @@ public class Utils { return path.substring(0, lastIndex); } + /** + * Convert the <i>throwable</i> to zookeeper related exceptions. + * + * @param throwable cause + * @param path zookeeper path + * @return zookeeper related exceptions + */ + public static Throwable zkException(Throwable throwable, String path) { + if (throwable instanceof KeeperException) { + return new ZKException("Encountered zookeeper exception on " + path, (KeeperException) throwable); + } else if (throwable instanceof ZooKeeperClient.ZooKeeperConnectionException) { + return new ZKException("Encountered zookeeper connection loss on " + path, + KeeperException.Code.CONNECTIONLOSS); + } else if (throwable instanceof InterruptedException) { + return new DLInterruptedException("Interrupted on operating " + path, throwable); + } else { + return new UnexpectedException("Encountered unexpected exception on operatiing " + path, throwable); + } + } + + /** + * Create transmit exception from transmit result. + * + * @param transmitResult + * transmit result (basically bk exception code) + * @return transmit exception + */ + public static BKTransmitException transmitException(int transmitResult) { + return new BKTransmitException("Failed to write to bookkeeper; Error is (" + + transmitResult + ") " + + BKException.getMessage(transmitResult), transmitResult); + } + + /** + * A specific version of {@link FutureUtils#result(CompletableFuture)} to handle known exception issues. + */ + public static <T> T ioResult(CompletableFuture<T> result) throws IOException { + return FutureUtils.result( + result, + (cause) -> { + if (cause instanceof IOException) { + return (IOException) cause; + } else if (cause instanceof KeeperException) { + return new ZKException("Encountered zookeeper exception on waiting result", + (KeeperException) cause); + } else if (cause instanceof BKException) { + return new BKTransmitException("Encountered bookkeeper exception on waiting result", + ((BKException) cause).getCode()); + } else if (cause instanceof InterruptedException) { + return new DLInterruptedException("Interrupted on waiting result", cause); + } else { + return new IOException("Encountered exception on waiting result", cause); + } + }); + } + + /** + * A specific version of {@link FutureUtils#result(CompletableFuture, long, TimeUnit)} + * to handle known exception issues. + */ + public static <T> T ioResult(CompletableFuture<T> result, long timeout, TimeUnit timeUnit) + throws IOException, TimeoutException { + return FutureUtils.result( + result, + (cause) -> { + if (cause instanceof IOException) { + return (IOException) cause; + } else if (cause instanceof KeeperException) { + return new ZKException("Encountered zookeeper exception on waiting result", + (KeeperException) cause); + } else if (cause instanceof BKException) { + return new BKTransmitException("Encountered bookkeeper exception on waiting result", + ((BKException) cause).getCode()); + } else if (cause instanceof InterruptedException) { + return new DLInterruptedException("Interrupted on waiting result", cause); + } else { + return new IOException("Encountered exception on waiting result", cause); + } + }, + timeout, + timeUnit); + } + + /** + * Abort async <i>abortable</i> + * + * @param abortable the {@code AsyncAbortable} object to be aborted, or null, in which case this method + * does nothing. + * @param swallowIOException if true, don't propagate IO exceptions thrown by the {@code abort} methods + * @throws IOException if {@code swallowIOException} is false and {@code abort} throws an {@code IOException} + */ + public static void abort(@Nullable AsyncAbortable abortable, + boolean swallowIOException) + throws IOException { + if (null == abortable) { + return; + } + try { + ioResult(abortable.asyncAbort()); + } catch (Exception ioe) { + if (swallowIOException) { + log.warn("IOException thrown while aborting Abortable {} : ", abortable, ioe); + } else { + throw ioe; + } + } + } + }
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/zk/LimitedPermitManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/zk/LimitedPermitManager.java b/distributedlog-core/src/main/java/org/apache/distributedlog/zk/LimitedPermitManager.java index 9a61c1c..1dd702f 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/zk/LimitedPermitManager.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/zk/LimitedPermitManager.java @@ -17,7 +17,7 @@ */ package org.apache.distributedlog.zk; -import org.apache.distributedlog.util.PermitManager; +import org.apache.distributedlog.common.util.PermitManager; import org.apache.bookkeeper.stats.Gauge; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/zk/ZKTransaction.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/zk/ZKTransaction.java b/distributedlog-core/src/main/java/org/apache/distributedlog/zk/ZKTransaction.java index a5da9c0..aeabbfa 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/zk/ZKTransaction.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/zk/ZKTransaction.java @@ -18,18 +18,17 @@ package org.apache.distributedlog.zk; import com.google.common.collect.Lists; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.distributedlog.ZooKeeperClient; -import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.common.concurrent.FutureUtils; import org.apache.distributedlog.util.Transaction; -import com.twitter.util.Future; -import com.twitter.util.Promise; +import org.apache.distributedlog.util.Utils; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.OpResult; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; - /** * ZooKeeper Transaction */ @@ -38,14 +37,14 @@ public class ZKTransaction implements Transaction<Object>, AsyncCallback.MultiCa private final ZooKeeperClient zkc; private final List<ZKOp> ops; private final List<org.apache.zookeeper.Op> zkOps; - private final Promise<Void> result; + private final CompletableFuture<Void> result; private final AtomicBoolean done = new AtomicBoolean(false); public ZKTransaction(ZooKeeperClient zkc) { this.zkc = zkc; this.ops = Lists.newArrayList(); this.zkOps = Lists.newArrayList(); - this.result = new Promise<Void>(); + this.result = new CompletableFuture<Void>(); } @Override @@ -60,16 +59,16 @@ public class ZKTransaction implements Transaction<Object>, AsyncCallback.MultiCa } @Override - public Future<Void> execute() { + public CompletableFuture<Void> execute() { if (!done.compareAndSet(false, true)) { return result; } try { zkc.get().multi(zkOps, this, result); } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - result.setException(FutureUtils.zkException(e, "")); + result.completeExceptionally(Utils.zkException(e, "")); } catch (InterruptedException e) { - result.setException(FutureUtils.zkException(e, "")); + result.completeExceptionally(Utils.zkException(e, "")); } return result; } @@ -82,7 +81,7 @@ public class ZKTransaction implements Transaction<Object>, AsyncCallback.MultiCa for (int i = 0; i < ops.size(); i++) { ops.get(i).abortOpResult(cause, null); } - FutureUtils.setException(result, cause); + FutureUtils.completeExceptionally(result, cause); } @Override @@ -91,13 +90,13 @@ public class ZKTransaction implements Transaction<Object>, AsyncCallback.MultiCa for (int i = 0; i < ops.size(); i++) { ops.get(i).commitOpResult(results.get(i)); } - FutureUtils.setValue(result, null); + FutureUtils.complete(result, null); } else { KeeperException ke = KeeperException.create(KeeperException.Code.get(rc)); for (int i = 0; i < ops.size(); i++) { ops.get(i).abortOpResult(ke, null != results ? results.get(i) : null); } - FutureUtils.setException(result, ke); + FutureUtils.completeExceptionally(result, ke); } } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/resources/findbugsExclude.xml ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/resources/findbugsExclude.xml b/distributedlog-core/src/main/resources/findbugsExclude.xml index 80adec8..40920db 100644 --- a/distributedlog-core/src/main/resources/findbugsExclude.xml +++ b/distributedlog-core/src/main/resources/findbugsExclude.xml @@ -37,4 +37,77 @@ <Method name="run" /> <Bug pattern="JLM_JSR166_UTILCONCURRENT_MONITORENTER" /> </Match> + <Match> + <Class name="org.apache.distributedlog.BKLogReadHandler$1" /> + <Method name="onSuccess" /> + <Bug pattern="NP_NONNULL_PARAM_VIOLATION" /> + </Match> + <Match> + <Class name="org.apache.distributedlog.BookKeeperClient$2" /> + <Bug pattern="NP_NONNULL_PARAM_VIOLATION" /> + </Match> + <Match> + <Class name="org.apache.distributedlog.ReadUtils" /> + <Bug pattern="NP_NONNULL_PARAM_VIOLATION" /> + </Match> + <Match> + <Class name="org.apache.distributedlog.ReadUtils$2" /> + <Bug pattern="NP_NONNULL_PARAM_VIOLATION" /> + </Match> + <Match> + <Class name="org.apache.distributedlog.auditor.DLAuditor$2" /> + <Bug pattern="NP_NONNULL_PARAM_VIOLATION" /> + </Match> + <Match> + <Class name="org.apache.distributedlog.auditor.DLAuditor$8" /> + <Bug pattern="NP_NONNULL_PARAM_VIOLATION" /> + </Match> + <Match> + <Class name="org.apache.distributedlog.bk.SimpleLedgerAllocator$4" /> + <Bug pattern="NP_NONNULL_PARAM_VIOLATION" /> + </Match> + <Match> + <Class name="org.apache.distributedlog.bk.SimpleLedgerAllocator$4$1" /> + <Bug pattern="NP_NONNULL_PARAM_VIOLATION" /> + </Match> + <Match> + <Class name="org.apache.distributedlog.bk.SimpleLedgerAllocator$5" /> + <Bug pattern="NP_NONNULL_PARAM_VIOLATION" /> + </Match> + <Match> + <Class name="org.apache.distributedlog.impl.acl.ZKAccessControl$4" /> + <Bug pattern="NP_NONNULL_PARAM_VIOLATION" /> + </Match> + <Match> + <Class name="org.apache.distributedlog.impl.acl.ZKAccessControlManager$1" /> + <Bug pattern="NP_NONNULL_PARAM_VIOLATION" /> + </Match> + <Match> + <Class name="org.apache.distributedlog.impl.acl.ZKAccessControlManager$1$1" /> + <Bug pattern="NP_NONNULL_PARAM_VIOLATION" /> + </Match> + <Match> + <Class name="org.apache.distributedlog.impl.metadata.ZKLogStreamMetadataStore$1$1" /> + <Bug pattern="NP_NONNULL_PARAM_VIOLATION" /> + </Match> + <Match> + <Class name="org.apache.distributedlog.lock.ZKSessionLock" /> + <Bug pattern="NP_NONNULL_PARAM_VIOLATION" /> + </Match> + <Match> + <Class name="org.apache.distributedlog.lock.ZKSessionLock$12" /> + <Bug pattern="NP_NONNULL_PARAM_VIOLATION" /> + </Match> + <Match> + <Class name="org.apache.distributedlog.lock.ZKSessionLock$13$1" /> + <Bug pattern="NP_NONNULL_PARAM_VIOLATION" /> + </Match> + <Match> + <Class name="org.apache.distributedlog.util.Utils" /> + <Bug pattern="NP_NONNULL_PARAM_VIOLATION" /> + </Match> + <Match> + <Class name="org.apache.distributedlog.util.Utils$6" /> + <Bug pattern="NP_NONNULL_PARAM_VIOLATION" /> + </Match> </FindBugsFilter> http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/DLMTestUtil.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/DLMTestUtil.java b/distributedlog-core/src/test/java/org/apache/distributedlog/DLMTestUtil.java index 96d2d1c..d821b05 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/DLMTestUtil.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/DLMTestUtil.java @@ -17,20 +17,21 @@ */ package org.apache.distributedlog; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import org.apache.distributedlog.api.AsyncLogWriter; +import org.apache.distributedlog.api.DistributedLogManager; +import org.apache.distributedlog.api.LogReader; +import org.apache.distributedlog.api.MetadataAccessor; +import org.apache.distributedlog.api.namespace.Namespace; import org.apache.distributedlog.impl.BKNamespaceDriver; import org.apache.distributedlog.impl.logsegment.BKLogSegmentEntryWriter; import org.apache.distributedlog.logsegment.LogSegmentEntryStore; -import org.apache.distributedlog.namespace.DistributedLogNamespace; -import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder; +import org.apache.distributedlog.api.namespace.NamespaceBuilder; import org.apache.distributedlog.namespace.NamespaceDriver; import org.apache.distributedlog.util.ConfUtils; -import org.apache.distributedlog.util.FutureUtils; -import org.apache.distributedlog.util.PermitLimiter; -import org.apache.distributedlog.util.RetryPolicyUtils; +import org.apache.distributedlog.common.util.PermitLimiter; import org.apache.distributedlog.util.Utils; -import com.twitter.util.Await; -import com.twitter.util.Duration; -import com.twitter.util.Future; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.conf.ServerConfiguration; @@ -78,7 +79,7 @@ public class DLMTestUtil { new HashMap<Long, LogSegmentMetadata>(children.size()); for (String child : children) { LogSegmentMetadata segment = - FutureUtils.result(LogSegmentMetadata.read(zkc, ledgerPath + "/" + child)); + Utils.ioResult(LogSegmentMetadata.read(zkc, ledgerPath + "/" + child)); LOG.info("Read segment {} : {}", child, segment); segments.put(segment.getLogSegmentSequenceNumber(), segment); } @@ -92,7 +93,7 @@ public class DLMTestUtil { public static DistributedLogManager createNewDLM(String name, DistributedLogConfiguration conf, URI uri) throws Exception { - DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() + Namespace namespace = NamespaceBuilder.newBuilder() .conf(conf).uri(uri).build(); return namespace.openLog(name); } @@ -102,7 +103,7 @@ public class DLMTestUtil { URI uri) throws Exception { // TODO: Metadata Accessor seems to be a legacy object which only used by kestrel // (we might consider deprecating this) - DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() + Namespace namespace = NamespaceBuilder.newBuilder() .conf(conf).uri(uri).build(); return namespace.getNamespaceDriver().getMetadataAccessor(name); } @@ -113,7 +114,7 @@ public class DLMTestUtil { List<LogSegmentMetadata> logSegmentList = dlm.getLogSegments(); LogSegmentMetadata lastSegment = logSegmentList.get(logSegmentList.size() - 1); LogSegmentEntryStore entryStore = dlm.getNamespaceDriver().getLogSegmentEntryStore(NamespaceDriver.Role.READER); - Utils.close(FutureUtils.result(entryStore.openRandomAccessReader(lastSegment, true))); + Utils.close(Utils.ioResult(entryStore.openRandomAccessReader(lastSegment, true))); } finally { dlm.close(); } @@ -313,12 +314,12 @@ public class DLMTestUtil { for (int i = 0; i < controlEntries; ++i) { LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid); record.setControl(); - Await.result(out.write(record)); + Utils.ioResult(out.write(record)); txid += txidStep; } for (int i = 0; i < userEntries; ++i) { LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid); - Await.result(out.write(record)); + Utils.ioResult(out.write(record)); txid += txidStep; } Utils.close(out); @@ -339,7 +340,7 @@ public class DLMTestUtil { throws Exception { BKDistributedLogManager dlm = (BKDistributedLogManager) manager; BKLogWriteHandler writeHandler = dlm.createWriteHandler(false); - FutureUtils.result(writeHandler.lockHandler()); + Utils.ioResult(writeHandler.lockHandler()); // Start a log segment with a given ledger seq number. BookKeeperClient bkc = getBookKeeperClient(dlm); LedgerHandle lh = bkc.get().createLedger(conf.getEnsembleSize(), conf.getWriteQuorumSize(), @@ -377,12 +378,12 @@ public class DLMTestUtil { for (long j = 1; j <= segmentSize; j++) { writer.write(DLMTestUtil.getLogRecordInstance(txid++)); } - FutureUtils.result(writer.flushAndCommit()); + Utils.ioResult(writer.flushAndCommit()); } if (completeLogSegment) { - FutureUtils.result(writeHandler.completeAndCloseLogSegment(writer)); + Utils.ioResult(writeHandler.completeAndCloseLogSegment(writer)); } - FutureUtils.result(writeHandler.unlockHandler()); + Utils.ioResult(writeHandler.unlockHandler()); } public static void injectLogSegmentWithLastDLSN(DistributedLogManager manager, DistributedLogConfiguration conf, @@ -390,7 +391,7 @@ public class DLMTestUtil { boolean recordWrongLastDLSN) throws Exception { BKDistributedLogManager dlm = (BKDistributedLogManager) manager; BKLogWriteHandler writeHandler = dlm.createWriteHandler(false); - FutureUtils.result(writeHandler.lockHandler()); + Utils.ioResult(writeHandler.lockHandler()); // Start a log segment with a given ledger seq number. BookKeeperClient bkc = getBookKeeperClient(dlm); LedgerHandle lh = bkc.get().createLedger(conf.getEnsembleSize(), conf.getWriteQuorumSize(), @@ -425,14 +426,14 @@ public class DLMTestUtil { long txid = startTxID; DLSN wrongDLSN = null; for (long j = 1; j <= segmentSize; j++) { - DLSN dlsn = Await.result(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(txid++))); + DLSN dlsn = Utils.ioResult(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(txid++))); if (j == (segmentSize - 1)) { wrongDLSN = dlsn; } } assertNotNull(wrongDLSN); if (recordWrongLastDLSN) { - FutureUtils.result(writer.asyncClose()); + Utils.ioResult(writer.asyncClose()); writeHandler.completeAndCloseLogSegment( writeHandler.inprogressZNodeName(writer.getLogSegmentId(), writer.getStartTxId(), writer.getLogSegmentSequenceNumber()), writer.getLogSegmentSequenceNumber(), @@ -443,9 +444,9 @@ public class DLMTestUtil { wrongDLSN.getEntryId(), wrongDLSN.getSlotId()); } else { - FutureUtils.result(writeHandler.completeAndCloseLogSegment(writer)); + Utils.ioResult(writeHandler.completeAndCloseLogSegment(writer)); } - FutureUtils.result(writeHandler.unlockHandler()); + Utils.ioResult(writeHandler.unlockHandler()); } public static void updateSegmentMetadata(ZooKeeperClient zkc, LogSegmentMetadata segment) throws Exception { @@ -469,18 +470,18 @@ public class DLMTestUtil { return conf; } - public static <T> void validateFutureFailed(Future<T> future, Class exClass) { + public static <T> void validateFutureFailed(CompletableFuture<T> future, Class exClass) { try { - Await.result(future); + Utils.ioResult(future); } catch (Exception ex) { LOG.info("Expected: {} Actual: {}", exClass.getName(), ex.getClass().getName()); assertTrue("exceptions types equal", exClass.isInstance(ex)); } } - public static <T> T validateFutureSucceededAndGetResult(Future<T> future) throws Exception { + public static <T> T validateFutureSucceededAndGetResult(CompletableFuture<T> future) throws Exception { try { - return Await.result(future, Duration.fromSeconds(10)); + return Utils.ioResult(future, 10, TimeUnit.SECONDS); } catch (Exception ex) { fail("unexpected exception " + ex.getClass().getName()); throw ex; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/NonBlockingReadsTestUtil.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/NonBlockingReadsTestUtil.java b/distributedlog-core/src/test/java/org/apache/distributedlog/NonBlockingReadsTestUtil.java index 2dbef02..126d337 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/NonBlockingReadsTestUtil.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/NonBlockingReadsTestUtil.java @@ -17,10 +17,11 @@ */ package org.apache.distributedlog; +import org.apache.distributedlog.api.DistributedLogManager; import org.apache.distributedlog.exceptions.LogEmptyException; import org.apache.distributedlog.exceptions.LogNotFoundException; import org.apache.distributedlog.exceptions.LogReadException; -import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.util.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -123,19 +124,19 @@ class NonBlockingReadsTestUtil { for (long i = 0; i < 3; i++) { BKAsyncLogWriter writer = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned(); for (long j = 1; j < segmentSize; j++) { - FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(txId++))); + Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txId++))); } if (recover) { - FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(txId++))); + Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txId++))); TimeUnit.MILLISECONDS.sleep(300); writer.abort(); LOG.debug("Recovering Segments"); BKLogWriteHandler blplm = ((BKDistributedLogManager) (dlm)).createWriteHandler(true); - FutureUtils.result(blplm.recoverIncompleteLogSegments()); - FutureUtils.result(blplm.asyncClose()); + Utils.ioResult(blplm.recoverIncompleteLogSegments()); + Utils.ioResult(blplm.asyncClose()); LOG.debug("Recovered Segments"); } else { - FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(txId++))); + Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txId++))); writer.closeAndComplete(); } TimeUnit.MILLISECONDS.sleep(300); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestAppendOnlyStreamReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestAppendOnlyStreamReader.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestAppendOnlyStreamReader.java index 922d89e..ae77522 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestAppendOnlyStreamReader.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestAppendOnlyStreamReader.java @@ -17,17 +17,14 @@ */ package org.apache.distributedlog; -import java.io.ByteArrayInputStream; -import java.net.URI; import java.util.Arrays; +import org.apache.distributedlog.api.DistributedLogManager; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; import org.apache.distributedlog.exceptions.EndOfStreamException; -import com.twitter.util.Await; -import com.twitter.util.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestAppendOnlyStreamWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestAppendOnlyStreamWriter.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestAppendOnlyStreamWriter.java index b5498ba..fc1f241 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestAppendOnlyStreamWriter.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestAppendOnlyStreamWriter.java @@ -20,8 +20,11 @@ package org.apache.distributedlog; import java.io.ByteArrayInputStream; import java.net.URI; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import org.apache.distributedlog.api.DistributedLogManager; import org.apache.distributedlog.exceptions.BKTransmitException; -import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.util.Utils; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; @@ -29,9 +32,6 @@ import org.junit.rules.TestName; import org.apache.distributedlog.exceptions.EndOfStreamException; import org.apache.distributedlog.exceptions.WriteException; import org.apache.distributedlog.util.FailpointUtils; -import com.twitter.util.Await; -import com.twitter.util.Duration; -import com.twitter.util.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,14 +90,14 @@ public class TestAppendOnlyStreamWriter extends TestDistributedLogBase { // happen very quickly. But we can test that the mechanics of the future write and api are basically // correct. AppendOnlyStreamWriter writer = dlmwriter.getAppendOnlyStreamWriter(); - Future<DLSN> dlsnFuture = writer.write(DLMTestUtil.repeatString("abc", 11).getBytes()); + CompletableFuture<DLSN> dlsnFuture = writer.write(DLMTestUtil.repeatString("abc", 11).getBytes()); // The real problem is the fsync completes before writes are submitted, so it never takes effect. Thread.sleep(1000); - assertFalse(dlsnFuture.isDefined()); + assertFalse(dlsnFuture.isDone()); writer.force(false); // Must not throw. - Await.result(dlsnFuture, Duration.fromSeconds(5)); + Utils.ioResult(dlsnFuture, 5, TimeUnit.SECONDS); writer.close(); dlmwriter.close(); @@ -124,11 +124,11 @@ public class TestAppendOnlyStreamWriter extends TestDistributedLogBase { // happen very quickly. But we can test that the mechanics of the future write and api are basically // correct. AppendOnlyStreamWriter writer = dlmwriter.getAppendOnlyStreamWriter(); - Future<DLSN> dlsnFuture = writer.write(byteStream); + CompletableFuture<DLSN> dlsnFuture = writer.write(byteStream); Thread.sleep(100); // Write hasn't been persisted, position better not be updated. - assertFalse(dlsnFuture.isDefined()); + assertFalse(dlsnFuture.isDone()); assertEquals(0, writer.position()); writer.force(false); // Position guaranteed to be accurate after writer.force(). @@ -167,7 +167,7 @@ public class TestAppendOnlyStreamWriter extends TestDistributedLogBase { // Much much less than the flush time, small enough not to slow down tests too much, just // gives a little more confidence. Thread.sleep(500); - Future<DLSN> dlsnFuture = writer.write(byteStream); + CompletableFuture<DLSN> dlsnFuture = writer.write(byteStream); assertEquals(0, writer.position()); writer.close(); @@ -188,7 +188,7 @@ public class TestAppendOnlyStreamWriter extends TestDistributedLogBase { AppendOnlyStreamWriter writer = dlmwriter.getAppendOnlyStreamWriter(); assertEquals(0, writer.position()); - Await.result(writer.write(byteStream)); + Utils.ioResult(writer.write(byteStream)); Thread.sleep(100); // let WriteCompleteListener have time to run assertEquals(33, writer.position()); @@ -205,12 +205,12 @@ public class TestAppendOnlyStreamWriter extends TestDistributedLogBase { BKDistributedLogManager dlm = (BKDistributedLogManager) createNewDLM(conf, name); URI uri = createDLMURI("/" + name); - FutureUtils.result(dlm.getWriterMetadataStore().getLog(uri, name, true, true)); + Utils.ioResult(dlm.getWriterMetadataStore().getLog(uri, name, true, true)); // Log exists but is empty, better not throw. AppendOnlyStreamWriter writer = dlm.getAppendOnlyStreamWriter(); byte[] byteStream = DLMTestUtil.repeatString("a", 1025).getBytes(); - Await.result(writer.write(byteStream)); + Utils.ioResult(writer.write(byteStream)); writer.close(); dlm.close(); @@ -266,7 +266,7 @@ public class TestAppendOnlyStreamWriter extends TestDistributedLogBase { BKDistributedLogManager dlm = (BKDistributedLogManager) createNewDLM(conf, name); URI uri = createDLMURI("/" + name); - FutureUtils.result(dlm.getWriterMetadataStore().getLog(uri, name, true, true)); + Utils.ioResult(dlm.getWriterMetadataStore().getLog(uri, name, true, true)); // Log exists but is empty, better not throw. AppendOnlyStreamWriter writer = dlm.getAppendOnlyStreamWriter(); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncBulkWrite.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncBulkWrite.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncBulkWrite.java index 139d935..6efd0c1 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncBulkWrite.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncBulkWrite.java @@ -17,14 +17,14 @@ */ package org.apache.distributedlog; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import org.apache.distributedlog.api.DistributedLogManager; import org.apache.distributedlog.exceptions.LogRecordTooLongException; import org.apache.distributedlog.exceptions.WriteCancelledException; import org.apache.distributedlog.exceptions.WriteException; import org.apache.distributedlog.util.FailpointUtils; -import org.apache.distributedlog.util.FutureUtils; -import com.twitter.util.Await; -import com.twitter.util.Duration; -import com.twitter.util.Future; +import org.apache.distributedlog.util.Utils; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; @@ -83,8 +83,8 @@ public class TestAsyncBulkWrite extends TestDistributedLogBase { records.add(DLMTestUtil.getLogRecordInstance(goodRecs, MAX_LOGRECORD_SIZE + 1)); records.addAll(DLMTestUtil.getLargeLogRecordInstanceList(1, goodRecs)); - Future<List<Future<DLSN>>> futureResults = writer.writeBulk(records); - List<Future<DLSN>> results = validateFutureSucceededAndGetResult(futureResults); + CompletableFuture<List<CompletableFuture<DLSN>>> futureResults = writer.writeBulk(records); + List<CompletableFuture<DLSN>> results = validateFutureSucceededAndGetResult(futureResults); // One future returned for each write. assertEquals(2*goodRecs + 1, results.size()); @@ -160,14 +160,14 @@ public class TestAsyncBulkWrite extends TestDistributedLogBase { // Write one record larger than max seg size. Ledger doesn't roll until next write. int txid = 1; LogRecord record = DLMTestUtil.getLogRecordInstance(txid++, 2048); - Future<DLSN> result = writer.write(record); + CompletableFuture<DLSN> result = writer.write(record); DLSN dlsn = validateFutureSucceededAndGetResult(result); assertEquals(1, dlsn.getLogSegmentSequenceNo()); // Write two more via bulk. Ledger doesn't roll because there's a partial failure. List<LogRecord> records = null; - Future<List<Future<DLSN>>> futureResults = null; - List<Future<DLSN>> results = null; + CompletableFuture<List<CompletableFuture<DLSN>>> futureResults = null; + List<CompletableFuture<DLSN>> results = null; records = new ArrayList<LogRecord>(2); records.add(DLMTestUtil.getLogRecordInstance(txid++, 2048)); records.add(DLMTestUtil.getLogRecordInstance(txid++, MAX_LOGRECORD_SIZE + 1)); @@ -309,15 +309,15 @@ public class TestAsyncBulkWrite extends TestDistributedLogBase { long txIndex) throws Exception { List<LogRecord> records = DLMTestUtil.getLogRecordInstanceList(txIndex, batchSize, recSize); - Future<List<Future<DLSN>>> futureResults = writer.writeBulk(records); + CompletableFuture<List<CompletableFuture<DLSN>>> futureResults = writer.writeBulk(records); assertNotNull(futureResults); - List<Future<DLSN>> results = Await.result(futureResults, Duration.fromSeconds(10)); + List<CompletableFuture<DLSN>> results = Utils.ioResult(futureResults, 10, TimeUnit.SECONDS); assertNotNull(results); assertEquals(results.size(), records.size()); long prevEntryId = 0; DLSN lastDlsn = null; - for (Future<DLSN> result : results) { - DLSN dlsn = Await.result(result, Duration.fromSeconds(10)); + for (CompletableFuture<DLSN> result : results) { + DLSN dlsn = Utils.ioResult(result, 10, TimeUnit.SECONDS); lastDlsn = dlsn; // If we cross a transmission boundary, slot id gets reset. @@ -338,12 +338,12 @@ public class TestAsyncBulkWrite extends TestDistributedLogBase { long txIndex) throws Exception { List<LogRecord> records = DLMTestUtil.getLogRecordInstanceList(txIndex, batchSize, recSize); - Future<List<Future<DLSN>>> futureResults = writer.writeBulk(records); + CompletableFuture<List<CompletableFuture<DLSN>>> futureResults = writer.writeBulk(records); assertNotNull(futureResults); - List<Future<DLSN>> results = Await.result(futureResults, Duration.fromSeconds(10)); + List<CompletableFuture<DLSN>> results = Utils.ioResult(futureResults, 10, TimeUnit.SECONDS); assertNotNull(results); assertEquals(results.size(), records.size()); - for (Future<DLSN> result : results) { + for (CompletableFuture<DLSN> result : results) { validateFutureFailed(result, IOException.class); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncReaderLock.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncReaderLock.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncReaderLock.java index adceaf9..62ac5ef 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncReaderLock.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncReaderLock.java @@ -17,36 +17,34 @@ */ package org.apache.distributedlog; -import java.io.IOException; import java.net.URI; import java.util.ArrayList; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import org.apache.distributedlog.api.AsyncLogReader; +import org.apache.distributedlog.api.DistributedLogManager; +import org.apache.distributedlog.api.namespace.Namespace; import org.apache.distributedlog.exceptions.LockCancelledException; import org.apache.distributedlog.exceptions.LockingException; import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException; import org.apache.distributedlog.impl.BKNamespaceDriver; import org.apache.distributedlog.lock.LockClosedException; -import org.apache.distributedlog.namespace.DistributedLogNamespace; -import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder; +import org.apache.distributedlog.api.namespace.NamespaceBuilder; import org.apache.distributedlog.namespace.NamespaceDriver; -import org.apache.distributedlog.subscription.SubscriptionsStore; -import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.api.subscription.SubscriptionsStore; +import org.apache.distributedlog.common.concurrent.FutureEventListener; import org.apache.distributedlog.util.Utils; -import com.twitter.util.Await; -import com.twitter.util.ExceptionalFunction; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction1; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -76,9 +74,9 @@ public class TestAsyncReaderLock extends TestDistributedLogBase { writer.write(DLMTestUtil.getLogRecordInstance(1L)); writer.closeAndComplete(); - Future<AsyncLogReader> futureReader1 = dlm.getAsyncLogReaderWithLock(DLSN.InitialDLSN); - BKAsyncLogReader reader1 = (BKAsyncLogReader) Await.result(futureReader1); - LogRecordWithDLSN record = Await.result(reader1.readNext()); + CompletableFuture<AsyncLogReader> futureReader1 = dlm.getAsyncLogReaderWithLock(DLSN.InitialDLSN); + BKAsyncLogReader reader1 = (BKAsyncLogReader) Utils.ioResult(futureReader1); + LogRecordWithDLSN record = Utils.ioResult(reader1.readNext()); assertEquals(1L, record.getTransactionId()); assertEquals(0L, record.getSequenceId()); DLMTestUtil.verifyLogRecord(record); @@ -89,9 +87,9 @@ public class TestAsyncReaderLock extends TestDistributedLogBase { // simulate a old stream created without readlock path NamespaceDriver driver = dlm.getNamespaceDriver(); ((BKNamespaceDriver) driver).getWriterZKC().get().delete(readLockPath, -1); - Future<AsyncLogReader> futureReader2 = dlm.getAsyncLogReaderWithLock(DLSN.InitialDLSN); - AsyncLogReader reader2 = Await.result(futureReader2); - record = Await.result(reader2.readNext()); + CompletableFuture<AsyncLogReader> futureReader2 = dlm.getAsyncLogReaderWithLock(DLSN.InitialDLSN); + AsyncLogReader reader2 = Utils.ioResult(futureReader2); + record = Utils.ioResult(reader2.readNext()); assertEquals(1L, record.getTransactionId()); assertEquals(0L, record.getSequenceId()); DLMTestUtil.verifyLogRecord(record); @@ -107,19 +105,14 @@ public class TestAsyncReaderLock extends TestDistributedLogBase { final CountDownLatch latch = new CountDownLatch(1); - Future<AsyncLogReader> futureReader1 = dlm.getAsyncLogReaderWithLock(DLSN.InitialDLSN); - futureReader1.flatMap(new ExceptionalFunction<AsyncLogReader, Future<Void>>() { - @Override - public Future<Void> applyE(AsyncLogReader reader) throws IOException { - return reader.asyncClose().map(new AbstractFunction1<Void, Void>() { - @Override - public Void apply(Void result) { + CompletableFuture<AsyncLogReader> futureReader1 = dlm.getAsyncLogReaderWithLock(DLSN.InitialDLSN); + futureReader1 + .thenCompose( + reader -> reader.asyncClose() + .thenApply(result -> { latch.countDown(); return null; - } - }); - } - }); + })); latch.await(); dlm.close(); @@ -133,8 +126,8 @@ public class TestAsyncReaderLock extends TestDistributedLogBase { writer.write(DLMTestUtil.getLogRecordInstance(1L)); writer.closeAndComplete(); - Future<AsyncLogReader> futureReader1 = dlm.getAsyncLogReaderWithLock(DLSN.InitialDLSN); - AsyncLogReader reader1 = Await.result(futureReader1); + CompletableFuture<AsyncLogReader> futureReader1 = dlm.getAsyncLogReaderWithLock(DLSN.InitialDLSN); + AsyncLogReader reader1 = Utils.ioResult(futureReader1); reader1.readNext(); final CountDownLatch acquiredLatch = new CountDownLatch(1); @@ -142,12 +135,12 @@ public class TestAsyncReaderLock extends TestDistributedLogBase { Thread acquireThread = new Thread(new Runnable() { @Override public void run() { - Future<AsyncLogReader> futureReader2 = null; + CompletableFuture<AsyncLogReader> futureReader2 = null; DistributedLogManager dlm2 = null; try { dlm2 = createNewDLM(conf, name); futureReader2 = dlm2.getAsyncLogReaderWithLock(DLSN.InitialDLSN); - AsyncLogReader reader2 = Await.result(futureReader2); + AsyncLogReader reader2 = Utils.ioResult(futureReader2); acquired.set(true); acquiredLatch.countDown(); } catch (Exception ex) { @@ -172,10 +165,10 @@ public class TestAsyncReaderLock extends TestDistributedLogBase { dlm.close(); } - int countDefined(ArrayList<Future<AsyncLogReader>> readers) { + int countDefined(ArrayList<CompletableFuture<AsyncLogReader>> readers) { int done = 0; - for (Future<AsyncLogReader> futureReader : readers) { - if (futureReader.isDefined()) { + for (CompletableFuture<AsyncLogReader> futureReader : readers) { + if (futureReader.isDone()) { done++; } } @@ -193,7 +186,7 @@ public class TestAsyncReaderLock extends TestDistributedLogBase { int count = 5; final CountDownLatch acquiredLatch = new CountDownLatch(count); - final ArrayList<Future<AsyncLogReader>> readers = new ArrayList<Future<AsyncLogReader>>(count); + final ArrayList<CompletableFuture<AsyncLogReader>> readers = new ArrayList<CompletableFuture<AsyncLogReader>>(count); for (int i = 0; i < count; i++) { readers.add(null); } @@ -201,7 +194,7 @@ public class TestAsyncReaderLock extends TestDistributedLogBase { for (int i = 0; i < count; i++) { dlms[i] = createNewDLM(conf, name); readers.set(i, dlms[i].getAsyncLogReaderWithLock(DLSN.InitialDLSN)); - readers.get(i).addEventListener(new FutureEventListener<AsyncLogReader>() { + readers.get(i).whenComplete(new FutureEventListener<AsyncLogReader>() { @Override public void onSuccess(AsyncLogReader reader) { acquiredLatch.countDown(); @@ -232,16 +225,17 @@ public class TestAsyncReaderLock extends TestDistributedLogBase { writer.closeAndComplete(); DistributedLogManager dlm1 = createNewDLM(conf, name); - Future<AsyncLogReader> futureReader1 = dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN); - AsyncLogReader reader1 = Await.result(futureReader1); + CompletableFuture<AsyncLogReader> futureReader1 = dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN); + AsyncLogReader reader1 = Utils.ioResult(futureReader1); BKDistributedLogManager dlm2 = (BKDistributedLogManager) createNewDLM(conf, name); - Future<AsyncLogReader> futureReader2 = dlm2.getAsyncLogReaderWithLock(DLSN.InitialDLSN); + CompletableFuture<AsyncLogReader> futureReader2 = dlm2.getAsyncLogReaderWithLock(DLSN.InitialDLSN); dlm2.close(); try { - Await.result(futureReader2); + Utils.ioResult(futureReader2); fail("should have thrown exception!"); + } catch (CancellationException ce) { } catch (LockClosedException ex) { } catch (LockCancelledException ex) { } @@ -256,7 +250,7 @@ public class TestAsyncReaderLock extends TestDistributedLogBase { String name = runtime.getMethodName(); URI uri = createDLMURI("/" + name); ensureURICreated(uri); - DistributedLogNamespace ns0 = DistributedLogNamespaceBuilder.newBuilder() + Namespace ns0 = NamespaceBuilder.newBuilder() .conf(conf) .uri(uri) .build(); @@ -266,13 +260,13 @@ public class TestAsyncReaderLock extends TestDistributedLogBase { writer.write(DLMTestUtil.getLogRecordInstance(2L)); writer.closeAndComplete(); - DistributedLogNamespace ns1 = DistributedLogNamespaceBuilder.newBuilder() + Namespace ns1 = NamespaceBuilder.newBuilder() .conf(conf) .uri(uri) .build(); DistributedLogManager dlm1 = ns1.openLog(name); - Future<AsyncLogReader> futureReader1 = dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN); - AsyncLogReader reader1 = Await.result(futureReader1); + CompletableFuture<AsyncLogReader> futureReader1 = dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN); + AsyncLogReader reader1 = Utils.ioResult(futureReader1); ZooKeeperClientUtils.expireSession(((BKNamespaceDriver) ns1.getNamespaceDriver()).getWriterZKC(), zkServers, 1000); // The result of expireSession is somewhat non-deterministic with this lock. @@ -280,12 +274,12 @@ public class TestAsyncReaderLock extends TestDistributedLogBase { // the moment rather than make it deterministic we accept either result. boolean success = false; try { - Await.result(reader1.readNext()); + Utils.ioResult(reader1.readNext()); success = true; } catch (LockingException ex) { } if (success) { - Await.result(reader1.readNext()); + Utils.ioResult(reader1.readNext()); } Utils.close(reader1); @@ -305,15 +299,16 @@ public class TestAsyncReaderLock extends TestDistributedLogBase { writer.closeAndComplete(); DistributedLogManager dlm1 = createNewDLM(conf, name); - Future<AsyncLogReader> futureReader1 = dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN); - AsyncLogReader reader1 = Await.result(futureReader1); + CompletableFuture<AsyncLogReader> futureReader1 = dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN); + AsyncLogReader reader1 = Utils.ioResult(futureReader1); DistributedLogManager dlm2 = createNewDLM(conf, name); - Future<AsyncLogReader> futureReader2 = dlm2.getAsyncLogReaderWithLock(DLSN.InitialDLSN); + CompletableFuture<AsyncLogReader> futureReader2 = dlm2.getAsyncLogReaderWithLock(DLSN.InitialDLSN); try { - FutureUtils.cancel(futureReader2); - Await.result(futureReader2); + futureReader2.cancel(true); + Utils.ioResult(futureReader2); fail("Should fail getting log reader as it is cancelled"); + } catch (CancellationException ce) { } catch (LockClosedException ex) { } catch (LockCancelledException ex) { } catch (OwnershipAcquireFailedException oafe) { @@ -322,7 +317,7 @@ public class TestAsyncReaderLock extends TestDistributedLogBase { futureReader2 = dlm2.getAsyncLogReaderWithLock(DLSN.InitialDLSN); Utils.close(reader1); - Await.result(futureReader2); + Utils.ioResult(futureReader2); dlm0.close(); dlm1.close(); @@ -339,13 +334,13 @@ public class TestAsyncReaderLock extends TestDistributedLogBase { writer.closeAndComplete(); DistributedLogManager dlm1 = createNewDLM(conf, name); - Future<AsyncLogReader> futureReader1 = dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN); + CompletableFuture<AsyncLogReader> futureReader1 = dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN); // Must not throw or cancel or do anything bad, future already completed. - Await.result(futureReader1); - FutureUtils.cancel(futureReader1); - AsyncLogReader reader1 = Await.result(futureReader1); - Await.result(reader1.readNext()); + Utils.ioResult(futureReader1); + futureReader1.cancel(true); + AsyncLogReader reader1 = Utils.ioResult(futureReader1); + Utils.ioResult(reader1.readNext()); dlm0.close(); dlm1.close(); @@ -361,13 +356,13 @@ public class TestAsyncReaderLock extends TestDistributedLogBase { writer.closeAndComplete(); DistributedLogManager dlm1 = createNewDLM(conf, name); - Future<AsyncLogReader> futureReader1 = dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN); - Future<AsyncLogReader> futureReader2 = dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN); + CompletableFuture<AsyncLogReader> futureReader1 = dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN); + CompletableFuture<AsyncLogReader> futureReader2 = dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN); // Both use the same client id, so there's no lock conflict. Not necessarily ideal, but how the // system currently works. - Await.result(futureReader1); - Await.result(futureReader2); + Utils.ioResult(futureReader1); + Utils.ioResult(futureReader2); dlm0.close(); dlm1.close(); @@ -413,7 +408,7 @@ public class TestAsyncReaderLock extends TestDistributedLogBase { private void readEntries(AsyncLogReader reader) { try { for (int i = 0; i < 300; i++) { - LogRecordWithDLSN record = Await.result(reader.readNext()); + LogRecordWithDLSN record = Utils.ioResult(reader.readNext()); currentDLSN.set(record.getDlsn()); } } catch (Exception ex) { @@ -446,7 +441,7 @@ public class TestAsyncReaderLock extends TestDistributedLogBase { localConf.setNumWorkerThreads(2); localConf.setLockTimeout(Long.MAX_VALUE); - DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() + Namespace namespace = NamespaceBuilder.newBuilder() .conf(localConf).uri(uri).clientId("main").build(); DistributedLogManager dlm0 = namespace.openLog(name); @@ -457,27 +452,27 @@ public class TestAsyncReaderLock extends TestDistributedLogBase { AtomicReference<DLSN> currentDLSN = new AtomicReference<DLSN>(DLSN.InitialDLSN); String clientId1 = "reader1"; - DistributedLogNamespace namespace1 = DistributedLogNamespaceBuilder.newBuilder() + Namespace namespace1 = NamespaceBuilder.newBuilder() .conf(localConf).uri(uri).clientId(clientId1).build(); DistributedLogManager dlm1 = namespace1.openLog(name); String clientId2 = "reader2"; - DistributedLogNamespace namespace2 = DistributedLogNamespaceBuilder.newBuilder() + Namespace namespace2 = NamespaceBuilder.newBuilder() .conf(localConf).uri(uri).clientId(clientId2).build(); DistributedLogManager dlm2 = namespace2.openLog(name); String clientId3 = "reader3"; - DistributedLogNamespace namespace3 = DistributedLogNamespaceBuilder.newBuilder() + Namespace namespace3 = NamespaceBuilder.newBuilder() .conf(localConf).uri(uri).clientId(clientId3).build(); DistributedLogManager dlm3 = namespace3.openLog(name); LOG.info("{} is opening reader on stream {}", clientId1, name); - Future<AsyncLogReader> futureReader1 = dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN); - AsyncLogReader reader1 = Await.result(futureReader1); + CompletableFuture<AsyncLogReader> futureReader1 = dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN); + AsyncLogReader reader1 = Utils.ioResult(futureReader1); LOG.info("{} opened reader on stream {}", clientId1, name); LOG.info("{} is opening reader on stream {}", clientId2, name); - Future<AsyncLogReader> futureReader2 = dlm2.getAsyncLogReaderWithLock(DLSN.InitialDLSN); + CompletableFuture<AsyncLogReader> futureReader2 = dlm2.getAsyncLogReaderWithLock(DLSN.InitialDLSN); LOG.info("{} is opening reader on stream {}", clientId3, name); - Future<AsyncLogReader> futureReader3 = dlm3.getAsyncLogReaderWithLock(DLSN.InitialDLSN); + CompletableFuture<AsyncLogReader> futureReader3 = dlm3.getAsyncLogReaderWithLock(DLSN.InitialDLSN); ExecutorService executorService = Executors.newCachedThreadPool(); @@ -485,26 +480,26 @@ public class TestAsyncReaderLock extends TestDistributedLogBase { new ReadRecordsListener(currentDLSN, clientId2, executorService); ReadRecordsListener listener3 = new ReadRecordsListener(currentDLSN, clientId3, executorService); - futureReader2.addEventListener(listener2); - futureReader3.addEventListener(listener3); + futureReader2.whenComplete(listener2); + futureReader3.whenComplete(listener3); // Get reader1 and start reading. for ( ; recordCount < 200; recordCount++) { - LogRecordWithDLSN record = Await.result(reader1.readNext()); + LogRecordWithDLSN record = Utils.ioResult(reader1.readNext()); currentDLSN.set(record.getDlsn()); } // Take a break, reader2 decides to stop waiting and cancels. Thread.sleep(1000); assertFalse(listener2.done()); - FutureUtils.cancel(futureReader2); + futureReader2.cancel(true); listener2.getLatch().await(); assertTrue(listener2.done()); assertTrue(listener2.failed()); // Reader1 starts reading again. for (; recordCount < 300; recordCount++) { - LogRecordWithDLSN record = Await.result(reader1.readNext()); + LogRecordWithDLSN record = Utils.ioResult(reader1.readNext()); currentDLSN.set(record.getDlsn()); } @@ -519,12 +514,12 @@ public class TestAsyncReaderLock extends TestDistributedLogBase { assertEquals(new DLSN(3, 99, 0), currentDLSN.get()); try { - Await.result(futureReader2); + Utils.ioResult(futureReader2); } catch (Exception ex) { // Can't get this one to close it--the dlm will take care of it. } - Utils.close(Await.result(futureReader3)); + Utils.close(Utils.ioResult(futureReader3)); dlm1.close(); namespace1.close(); @@ -553,7 +548,7 @@ public class TestAsyncReaderLock extends TestDistributedLogBase { for (long i = 0; i < 3; i++) { BKAsyncLogWriter writer = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned(); for (long j = 1; j <= 10; j++) { - DLSN dlsn = Await.result(writer.write(DLMTestUtil.getEmptyLogRecordInstance(txid++))); + DLSN dlsn = Utils.ioResult(writer.write(DLMTestUtil.getEmptyLogRecordInstance(txid++))); if (i == 1 && j == 1L) { readDLSN = dlsn; } @@ -561,10 +556,10 @@ public class TestAsyncReaderLock extends TestDistributedLogBase { writer.closeAndComplete(); } - BKAsyncLogReader reader0 = (BKAsyncLogReader) Await.result(dlm.getAsyncLogReaderWithLock(subscriberId)); + BKAsyncLogReader reader0 = (BKAsyncLogReader) Utils.ioResult(dlm.getAsyncLogReaderWithLock(subscriberId)); assertEquals(DLSN.NonInclusiveLowerBound, reader0.getStartDLSN()); long numTxns = 0; - LogRecordWithDLSN record = Await.result(reader0.readNext()); + LogRecordWithDLSN record = Utils.ioResult(reader0.readNext()); while (null != record) { DLMTestUtil.verifyEmptyLogRecord(record); ++numTxns; @@ -574,18 +569,18 @@ public class TestAsyncReaderLock extends TestDistributedLogBase { if (txid - 1 == numTxns) { break; } - record = Await.result(reader0.readNext()); + record = Utils.ioResult(reader0.readNext()); } assertEquals(txid - 1, numTxns); Utils.close(reader0); SubscriptionsStore subscriptionsStore = dlm.getSubscriptionsStore(); - Await.result(subscriptionsStore.advanceCommitPosition(subscriberId, readDLSN)); - BKAsyncLogReader reader1 = (BKAsyncLogReader) Await.result(dlm.getAsyncLogReaderWithLock(subscriberId)); + Utils.ioResult(subscriptionsStore.advanceCommitPosition(subscriberId, readDLSN)); + BKAsyncLogReader reader1 = (BKAsyncLogReader) Utils.ioResult(dlm.getAsyncLogReaderWithLock(subscriberId)); assertEquals(readDLSN, reader1.getStartDLSN()); numTxns = 0; long startTxID = 10L; - record = Await.result(reader1.readNext()); + record = Utils.ioResult(reader1.readNext()); while (null != record) { DLMTestUtil.verifyEmptyLogRecord(record); ++numTxns; @@ -596,7 +591,7 @@ public class TestAsyncReaderLock extends TestDistributedLogBase { if (startTxID == txid - 1) { break; } - record = Await.result(reader1.readNext()); + record = Utils.ioResult(reader1.readNext()); } assertEquals(txid - 1, startTxID); assertEquals(20, numTxns);