[GitHub] incubator-distributedlog pull request #133: DL-124: Use Java8 Future rather ...
Github user asfgit closed the pull request at: https://github.com/apache/incubator-distributedlog/pull/133 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-distributedlog pull request #133: DL-124: Use Java8 Future rather ...
Github user sijie commented on a diff in the pull request: https://github.com/apache/incubator-distributedlog/pull/133#discussion_r122609664 --- Diff: distributedlog-benchmark/conf/log4j.properties --- @@ -30,11 +30,7 @@ log4j.logger.org.apache.zookeeper=INFO log4j.logger.org.apache.bookkeeper=INFO # redirect executor output to executors.log since slow op warnings can be quite verbose -log4j.logger.org.apache.distributedlog.util.MonitoredFuturePool=INFO, Executors -log4j.logger.org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor=INFO, Executors log4j.logger.org.apache.bookkeeper.util.SafeRunnable=INFO, Executors -log4j.additivity.org.apache.distributedlog.util.MonitoredFuturePool=false --- End diff -- MonitoredFuturePool and MonitoredScheduledThreadPoolExecutor are already removed in this patch. because in Java8, you can configure a thread where the callbacks will be executed, there is no need for additional future pool, we just use the ordered scheduler. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-distributedlog pull request #133: DL-124: Use Java8 Future rather ...
Github user leighst commented on a diff in the pull request: https://github.com/apache/incubator-distributedlog/pull/133#discussion_r122056689 --- Diff: distributedlog-benchmark/conf/log4j.properties --- @@ -30,11 +30,7 @@ log4j.logger.org.apache.zookeeper=INFO log4j.logger.org.apache.bookkeeper=INFO # redirect executor output to executors.log since slow op warnings can be quite verbose -log4j.logger.org.apache.distributedlog.util.MonitoredFuturePool=INFO, Executors -log4j.logger.org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor=INFO, Executors log4j.logger.org.apache.bookkeeper.util.SafeRunnable=INFO, Executors -log4j.additivity.org.apache.distributedlog.util.MonitoredFuturePool=false --- End diff -- i think this will result in duplicate messages? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-distributedlog pull request #133: DL-124: Use Java8 Future rather ...
Github user sijie commented on a diff in the pull request: https://github.com/apache/incubator-distributedlog/pull/133#discussion_r121057575 --- Diff: distributedlog-core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java --- @@ -201,33 +183,31 @@ public void safeRun() { * Begin asynchronous lock acquire, but ensure that the returned future is satisfied on an * executor service thread. */ -Future acquireLockOnExecutorThread(DistributedLock lock) throws LockingException { -final Future acquireFuture = lock.asyncAcquire(); +CompletableFuture acquireLockOnExecutorThread(DistributedLock lock) throws LockingException { +final CompletableFuture acquireFuture = lock.asyncAcquire(); // The future we return must be satisfied on an executor service thread. If we simply // return the future returned by asyncAcquire, user callbacks may end up running in // the lock state executor thread, which will cause deadlocks and introduce latency // etc. -final Promise threadAcquirePromise = new Promise(); -threadAcquirePromise.setInterruptHandler(new Function() { -@Override -public BoxedUnit apply(Throwable t) { -FutureUtils.cancel(acquireFuture); -return null; +final CompletableFuture threadAcquirePromise = new CompletableFuture(); +threadAcquirePromise.whenComplete((value, cause) -> { --- End diff -- correctly, java8 future doesn't have the concept of interrupt. I try to simulate the similar thing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-distributedlog pull request #133: DL-124: Use Java8 Future rather ...
Github user sijie commented on a diff in the pull request: https://github.com/apache/incubator-distributedlog/pull/133#discussion_r121057495 --- Diff: distributedlog-benchmark/pom.xml --- @@ -27,12 +27,12 @@ org.apache.distributedlog - distributedlog-client + distributedlog-proxy-client --- End diff -- this change was based on #132. github doesn't allow pull request based on the other branch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-distributedlog pull request #133: DL-124: Use Java8 Future rather ...
Github user leighst commented on a diff in the pull request: https://github.com/apache/incubator-distributedlog/pull/133#discussion_r121046009 --- Diff: distributedlog-benchmark/pom.xml --- @@ -27,12 +27,12 @@ org.apache.distributedlog - distributedlog-client + distributedlog-proxy-client --- End diff -- why? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-distributedlog pull request #133: DL-124: Use Java8 Future rather ...
Github user leighst commented on a diff in the pull request: https://github.com/apache/incubator-distributedlog/pull/133#discussion_r121046188 --- Diff: distributedlog-core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java --- @@ -201,33 +183,31 @@ public void safeRun() { * Begin asynchronous lock acquire, but ensure that the returned future is satisfied on an * executor service thread. */ -Future acquireLockOnExecutorThread(DistributedLock lock) throws LockingException { -final Future acquireFuture = lock.asyncAcquire(); +CompletableFuture acquireLockOnExecutorThread(DistributedLock lock) throws LockingException { +final CompletableFuture acquireFuture = lock.asyncAcquire(); // The future we return must be satisfied on an executor service thread. If we simply // return the future returned by asyncAcquire, user callbacks may end up running in // the lock state executor thread, which will cause deadlocks and introduce latency // etc. -final Promise threadAcquirePromise = new Promise(); -threadAcquirePromise.setInterruptHandler(new Function() { -@Override -public BoxedUnit apply(Throwable t) { -FutureUtils.cancel(acquireFuture); -return null; +final CompletableFuture threadAcquirePromise = new CompletableFuture(); +threadAcquirePromise.whenComplete((value, cause) -> { --- End diff -- do i understand correctly java8 future doesn't really have the concept of interrupt? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-distributedlog pull request #133: DL-124: Use Java8 Future rather ...
Github user leighst commented on a diff in the pull request: https://github.com/apache/incubator-distributedlog/pull/133#discussion_r121046543 --- Diff: distributedlog-core/src/main/java/org/apache/distributedlog/util/Utils.java --- @@ -635,4 +631,80 @@ public static String getParent(final String path) { return path.substring(0, lastIndex); } +/** + * Convert the throwable to zookeeper related exceptions. + * + * @param throwable cause + * @param path zookeeper path + * @return zookeeper related exceptions + */ +public static Throwable zkException(Throwable throwable, String path) { +if (throwable instanceof KeeperException) { +return new ZKException("Encountered zookeeper exception on " + path, (KeeperException) throwable); +} else if (throwable instanceof ZooKeeperClient.ZooKeeperConnectionException) { +return new ZKException("Encountered zookeeper connection loss on " + path, +KeeperException.Code.CONNECTIONLOSS); +} else if (throwable instanceof InterruptedException) { +return new DLInterruptedException("Interrupted on operating " + path, throwable); +} else { +return new UnexpectedException("Encountered unexpected exception on operatiing " + path, throwable); +} +} + +/** + * Create transmit exception from transmit result. + * + * @param transmitResult + * transmit result (basically bk exception code) + * @return transmit exception + */ +public static BKTransmitException transmitException(int transmitResult) { +return new BKTransmitException("Failed to write to bookkeeper; Error is (" ++ transmitResult + ") " ++ BKException.getMessage(transmitResult), transmitResult); +} + +public static T ioResult(CompletableFuture result) throws IOException { --- End diff -- why not have a java8 version of FutureUtils? seems weird to throw all of this into one file --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-distributedlog pull request #133: DL-124: Use Java8 Future rather ...
Github user jiazhai commented on a diff in the pull request: https://github.com/apache/incubator-distributedlog/pull/133#discussion_r119984925 --- Diff: distributedlog-core/src/main/java/org/apache/distributedlog/util/Utils.java --- @@ -445,35 +440,35 @@ public void processResult(int rc, String path, Object ctx) { * false if the node doesn't exist, otherwise future will throw exception * */ -public static Future zkDeleteIfNotExist(ZooKeeperClient zkc, String path, ZkVersion version) { +public static CompletableFuture zkDeleteIfNotExist(ZooKeeperClient zkc, String path, ZkVersion version) { ZooKeeper zk; try { zk = zkc.get(); } catch (ZooKeeperClient.ZooKeeperConnectionException e) { -return Future.exception(FutureUtils.zkException(e, path)); +return FutureUtils.exception(zkException(e, path)); } catch (InterruptedException e) { -return Future.exception(FutureUtils.zkException(e, path)); +return FutureUtils.exception(zkException(e, path)); } -final Promise promise = new Promise(); +final CompletableFuture promise = new CompletableFuture(); zk.delete(path, version.getZnodeVersion(), new AsyncCallback.VoidCallback() { @Override public void processResult(int rc, String path, Object ctx) { if (KeeperException.Code.OK.intValue() == rc ) { -promise.setValue(true); +promise.complete(true); } else if (KeeperException.Code.NONODE.intValue() == rc) { -promise.setValue(false); +promise.complete(false); } else { - promise.setException(KeeperException.create(KeeperException.Code.get(rc))); + promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc))); } } }, null); return promise; } -public static Future asyncClose(@Nullable AsyncCloseable closeable, +public static CompletableFuture asyncClose(@Nullable AsyncCloseable closeable, boolean swallowIOException) { --- End diff -- Please also do the code alignment change here, and line 573, 586. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-distributedlog pull request #133: DL-124: Use Java8 Future rather ...
Github user jiazhai commented on a diff in the pull request: https://github.com/apache/incubator-distributedlog/pull/133#discussion_r119984863 --- Diff: distributedlog-core/src/main/java/org/apache/distributedlog/metadata/MetadataUpdater.java --- @@ -56,7 +56,7 @@ * ledger sequence number to change. * @return new log segment */ -Future changeSequenceNumber(LogSegmentMetadata segment, +CompletableFuture changeSequenceNumber(LogSegmentMetadata segment, long logSegmentSeqNo); --- End diff -- Please also do the code alignment change here, and line 102. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-distributedlog pull request #133: DL-124: Use Java8 Future rather ...
Github user jiazhai commented on a diff in the pull request: https://github.com/apache/incubator-distributedlog/pull/133#discussion_r119984852 --- Diff: distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogStreamMetadataStore.java --- @@ -59,7 +59,7 @@ * @param readerId the reader id used for lock * @return the read lock */ -Future createReadLock(LogMetadataForReader metadata, +CompletableFuture createReadLock(LogMetadataForReader metadata, Optional readerId); --- End diff -- Please also do the code alignment change here, and line 83. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-distributedlog pull request #133: DL-124: Use Java8 Future rather ...
Github user jiazhai commented on a diff in the pull request: https://github.com/apache/incubator-distributedlog/pull/133#discussion_r119984844 --- Diff: distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogSegmentMetadataStoreUpdater.java --- @@ -73,7 +72,7 @@ private String formatLogSegmentSequenceNumber(long logSegmentSeqNo) { } @Override -public Future changeSequenceNumber(LogSegmentMetadata segment, +public CompletableFuture changeSequenceNumber(LogSegmentMetadata segment, long logSegmentSeqNo) { --- End diff -- Please also do the code alignment change here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-distributedlog pull request #133: DL-124: Use Java8 Future rather ...
Github user jiazhai commented on a diff in the pull request: https://github.com/apache/incubator-distributedlog/pull/133#discussion_r119984837 --- Diff: distributedlog-core/src/main/java/org/apache/distributedlog/logsegment/LogSegmentEntryStore.java --- @@ -58,7 +57,7 @@ * @param startEntryId the start entry id * @return future represent the opened reader */ -Future openReader(LogSegmentMetadata segment, +CompletableFuture openReader(LogSegmentMetadata segment, long startEntryId); --- End diff -- Please also do the code alignment change here, and line 71 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-distributedlog pull request #133: DL-124: Use Java8 Future rather ...
Github user jiazhai commented on a diff in the pull request: https://github.com/apache/incubator-distributedlog/pull/133#discussion_r119984828 --- Diff: distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java --- @@ -1088,9 +1063,9 @@ public String getActionName() { }); } -private Future checkLockOwnerAndWaitIfPossible(final LockWatcher lockWatcher, +private CompletableFuture checkLockOwnerAndWaitIfPossible(final LockWatcher lockWatcher, final boolean wait) { --- End diff -- Please also do the code alignment change here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-distributedlog pull request #133: DL-124: Use Java8 Future rather ...
Github user jiazhai commented on a diff in the pull request: https://github.com/apache/incubator-distributedlog/pull/133#discussion_r119984799 --- Diff: distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java --- @@ -237,15 +227,13 @@ public DistributedLock createWriteLock(LogMetadataForWriter metadata) { // Create Read Lock // -private Future ensureReadLockPathExist(final LogMetadata logMetadata, +private CompletableFuture ensureReadLockPathExist(final LogMetadata logMetadata, final String readLockPath) { --- End diff -- Please also do the code alignment change here, and line 269, 312, 521, 572. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-distributedlog pull request #133: DL-124: Use Java8 Future rather ...
Github user jiazhai commented on a diff in the pull request: https://github.com/apache/incubator-distributedlog/pull/133#discussion_r119984772 --- Diff: distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java --- @@ -186,13 +185,13 @@ LedgerAllocator createLedgerAllocator(LogMetadataForWriter logMetadata, // @Override -public Future openReader(LogSegmentMetadata segment, +public CompletableFuture openReader(LogSegmentMetadata segment, long startEntryId) { --- End diff -- Please also do the code alignment change here, also at line 244. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-distributedlog pull request #133: DL-124: Use Java8 Future rather ...
Github user jiazhai commented on a diff in the pull request: https://github.com/apache/incubator-distributedlog/pull/133#discussion_r119984743 --- Diff: distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java --- @@ -350,40 +349,40 @@ public void process(WatchedEvent event) { } @Override -public Future getLogSegment(String logSegmentPath) { +public CompletableFuture getLogSegment(String logSegmentPath) { return LogSegmentMetadata.read(zkc, logSegmentPath, skipMinVersionCheck); } -Future>> zkGetLogSegmentNames(String logSegmentsPath, Watcher watcher) { -Promise>> result = new Promise>>(); +CompletableFuture>> zkGetLogSegmentNames(String logSegmentsPath, Watcher watcher) { +CompletableFuture>> result = new CompletableFuture>>(); try { zkc.get().getChildren(logSegmentsPath, watcher, this, result); } catch (ZooKeeperClient.ZooKeeperConnectionException e) { -result.setException(FutureUtils.zkException(e, logSegmentsPath)); +result.completeExceptionally(Utils.zkException(e, logSegmentsPath)); } catch (InterruptedException e) { -result.setException(FutureUtils.zkException(e, logSegmentsPath)); +result.completeExceptionally(Utils.zkException(e, logSegmentsPath)); } return result; } @Override @SuppressWarnings("unchecked") public void processResult(int rc, String path, Object ctx, List children, Stat stat) { -Promise>> result = ((Promise>>) ctx); +CompletableFuture>> result = ((CompletableFuture>>) ctx); if (KeeperException.Code.OK.intValue() == rc) { /** cversion: the number of changes to the children of this znode **/ ZkVersion zkVersion = new ZkVersion(stat.getCversion()); -result.setValue(new Versioned(children, zkVersion)); +result.complete(new Versioned(children, zkVersion)); } else if (KeeperException.Code.NONODE.intValue() == rc) { -result.setException(new LogNotFoundException("Log " + path + " not found")); +result.completeExceptionally(new LogNotFoundException("Log " + path + " not found")); } else { -result.setException(new ZKException("Failed to get log segments from " + path, +result.completeExceptionally(new ZKException("Failed to get log segments from " + path, KeeperException.Code.get(rc))); } } @Override -public Future>> getLogSegmentNames(String logSegmentsPath, +public CompletableFuture>> getLogSegmentNames(String logSegmentsPath, LogSegmentNamesListener listener) { --- End diff -- Please also do the code alignment change here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-distributedlog pull request #133: DL-124: Use Java8 Future rather ...
Github user jiazhai commented on a diff in the pull request: https://github.com/apache/incubator-distributedlog/pull/133#discussion_r119984733 --- Diff: distributedlog-core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java --- @@ -105,73 +103,71 @@ public ConcurrentObtainException(Phase phase, String msg) { // Allocated Ledger LedgerHandle allocatedLh = null; -Future closeFuture = null; -final LinkedList> ledgerDeletions = -new LinkedList>(); +CompletableFuture closeFuture = null; +final LinkedList> ledgerDeletions = +new LinkedList>(); // Ledger configuration private final QuorumConfigProvider quorumConfigProvider; -static Future> getAndCreateAllocationData(final String allocatePath, +static CompletableFuture> getAndCreateAllocationData(final String allocatePath, --- End diff -- Please also do the code alignment change here, and also at line 128, 159. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-distributedlog pull request #133: DL-124: Use Java8 Future rather ...
Github user jiazhai commented on a diff in the pull request: https://github.com/apache/incubator-distributedlog/pull/133#discussion_r119984686 --- Diff: distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClient.java --- @@ -198,52 +196,52 @@ public synchronized BookKeeper get() throws IOException { } // Util functions -public Future createLedger(int ensembleSize, - int writeQuorumSize, - int ackQuorumSize) { +public CompletableFuture createLedger(int ensembleSize, +int writeQuorumSize, +int ackQuorumSize) { BookKeeper bk; try { bk = get(); } catch (IOException ioe) { -return Future.exception(ioe); +return FutureUtils.exception(ioe); } -final Promise promise = new Promise(); +final CompletableFuture promise = new CompletableFuture(); bk.asyncCreateLedger(ensembleSize, writeQuorumSize, ackQuorumSize, BookKeeper.DigestType.CRC32, passwd, new AsyncCallback.CreateCallback() { @Override public void createComplete(int rc, LedgerHandle lh, Object ctx) { if (BKException.Code.OK == rc) { -promise.updateIfEmpty(new Return(lh)); +promise.complete(lh); } else { -promise.updateIfEmpty(new Throw(BKException.create(rc))); + promise.completeExceptionally(BKException.create(rc)); } } }, null); return promise; } -public Future deleteLedger(long lid, +public CompletableFuture deleteLedger(long lid, final boolean ignoreNonExistentLedger) { --- End diff -- Please also do the code alignment change here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-distributedlog pull request #133: DL-124: Use Java8 Future rather ...
Github user jiazhai commented on a diff in the pull request: https://github.com/apache/incubator-distributedlog/pull/133#discussion_r119984676 --- Diff: distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java --- @@ -483,23 +482,23 @@ protected long assignLogSegmentSequenceNumber() throws IOException { } protected BKLogSegmentWriter doStartLogSegment(long txId, boolean bestEffort, boolean allowMaxTxID) throws IOException { -return FutureUtils.result(asyncStartLogSegment(txId, bestEffort, allowMaxTxID)); +return Utils.ioResult(asyncStartLogSegment(txId, bestEffort, allowMaxTxID)); } -protected Future asyncStartLogSegment(final long txId, +protected CompletableFuture asyncStartLogSegment(final long txId, final boolean bestEffort, --- End diff -- Please also do the code alignment change here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-distributedlog pull request #133: DL-124: Use Java8 Future rather ...
Github user jiazhai commented on a diff in the pull request: https://github.com/apache/incubator-distributedlog/pull/133#discussion_r119984610 --- Diff: distributedlog-core/src/main/java/org/apache/distributedlog/BKLogHandler.java --- @@ -397,15 +398,15 @@ private Long sum(List values) { } @Override -public Future asyncAbort() { +public CompletableFuture asyncAbort() { return asyncClose(); } -public Future asyncReadLastUserRecord(final LogSegmentMetadata l) { +public CompletableFuture asyncReadLastUserRecord(final LogSegmentMetadata l) { return asyncReadLastRecord(l, false, false, false); } -public Future asyncReadLastRecord(final LogSegmentMetadata l, +public CompletableFuture asyncReadLastRecord(final LogSegmentMetadata l, final boolean fence, --- End diff -- Please also do the code alignment change here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-distributedlog pull request #133: DL-124: Use Java8 Future rather ...
Github user jiazhai commented on a diff in the pull request: https://github.com/apache/incubator-distributedlog/pull/133#discussion_r119984594 --- Diff: distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java --- @@ -525,75 +495,63 @@ public BKSyncLogWriter startLogSegmentNonPartitioned() throws IOException { */ @Override public BKAsyncLogWriter startAsyncLogSegmentNonPartitioned() throws IOException { -return (BKAsyncLogWriter) FutureUtils.result(openAsyncLogWriter()); +return (BKAsyncLogWriter) Utils.ioResult(openAsyncLogWriter()); } @Override -public Future openAsyncLogWriter() { +public CompletableFuture openAsyncLogWriter() { try { checkClosedOrInError("startLogSegmentNonPartitioned"); } catch (AlreadyClosedException e) { -return Future.exception(e); +return FutureUtils.exception(e); } -Future createWriteHandleFuture; +CompletableFuture createWriteHandleFuture; synchronized (this) { // 1. create the locked write handler createWriteHandleFuture = asyncCreateWriteHandler(true); } -return createWriteHandleFuture.flatMap(new AbstractFunction1>() { -@Override -public Future apply(final BKLogWriteHandler writeHandler) { -final BKAsyncLogWriter writer; -synchronized (BKDistributedLogManager.this) { -// 2. create the writer with the handler -writer = new BKAsyncLogWriter( -conf, -dynConf, -BKDistributedLogManager.this, -writeHandler, -featureProvider, -statsLogger); -} -// 3. recover the incomplete log segments -return writeHandler.recoverIncompleteLogSegments() -.map(new AbstractFunction1() { -@Override -public AsyncLogWriter apply(Long lastTxId) { -// 4. update last tx id if successfully recovered -writer.setLastTxId(lastTxId); -return writer; -} -}).onFailure(new AbstractFunction1() { -@Override -public BoxedUnit apply(Throwable cause) { -// 5. close the writer if recovery failed -writer.asyncAbort(); -return BoxedUnit.UNIT; -} -}); +return createWriteHandleFuture.thenCompose(writeHandler -> { +final BKAsyncLogWriter writer; +synchronized (BKDistributedLogManager.this) { +// 2. create the writer with the handler +writer = new BKAsyncLogWriter( +conf, +dynConf, +BKDistributedLogManager.this, +writeHandler, +featureProvider, +statsLogger); } +// 3. recover the incomplete log segments +return writeHandler.recoverIncompleteLogSegments() +.thenApply(lastTxId -> { +// 4. update last tx id if successfully recovered +writer.setLastTxId(lastTxId); +return (AsyncLogWriter) writer; +}) +.whenComplete((lastTxId, cause) -> { +if (null != cause) { +// 5. close the writer if recovery failed +writer.asyncAbort(); +} +}); }); } @Override -public Future getDLSNNotLessThanTxId(final long fromTxnId) { -return getLogSegmentsAsync().flatMap(new AbstractFunction1, Future>() { -@Override -public Future apply(List segments) { -return getDLSNNotLessThanTxId(fromTxnId, segments); -} -}); +public CompletableFuture getDLSNNotLessThanTxId(final long fromTxnId) { +return getLogSegmentsAsync().thenCompose(segments -> getDLSNNotLessThanTxId(fromTxnId, segments)); } -private Future getDLSNNotLessThanTxId(long fromTxnId, +priv
[GitHub] incubator-distributedlog pull request #133: DL-124: Use Java8 Future rather ...
Github user jiazhai commented on a diff in the pull request: https://github.com/apache/incubator-distributedlog/pull/133#discussion_r119984515 --- Diff: distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java --- @@ -206,7 +197,7 @@ private BKLogSegmentWriter getCachedLogSegmentWriter() throws WriteException { } } -private Future getLogSegmentWriter(long firstTxid, +private CompletableFuture getLogSegmentWriter(long firstTxid, boolean bestEffort, --- End diff -- Please also do the code alignment change in this file, at line 201, 212, 242, 272, --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-distributedlog pull request #133: DL-124: Use Java8 Future rather ...
Github user jiazhai commented on a diff in the pull request: https://github.com/apache/incubator-distributedlog/pull/133#discussion_r119984462 --- Diff: distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java --- @@ -404,7 +392,7 @@ public String getStreamName() { * num entries to read * @return A promise that satisfied with a non-empty list of log records with their DLSN. */ -private synchronized Future> readInternal(int numEntries, +private synchronized CompletableFuture> readInternal(int numEntries, long deadlineTime, --- End diff -- Please also do the code alignment change here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-distributedlog pull request #133: DL-124: Use Java8 Future rather ...
Github user jiazhai commented on a diff in the pull request: https://github.com/apache/incubator-distributedlog/pull/133#discussion_r119984456 --- Diff: distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java --- @@ -381,16 +369,16 @@ public String getStreamName() { * @return A promise that when satisfied will contain the Log Record with its DLSN. */ @Override -public synchronized Future readNext() { -return readInternal(1, 0, TimeUnit.MILLISECONDS).map(READ_NEXT_MAP_FUNCTION); +public synchronized CompletableFuture readNext() { +return readInternal(1, 0, TimeUnit.MILLISECONDS).thenApply(READ_NEXT_MAP_FUNCTION); } -public synchronized Future> readBulk(int numEntries) { +public synchronized CompletableFuture> readBulk(int numEntries) { return readInternal(numEntries, 0, TimeUnit.MILLISECONDS); } @Override -public synchronized Future> readBulk(int numEntries, +public synchronized CompletableFuture> readBulk(int numEntries, long waitTime, --- End diff -- Please also do the code alignment change here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-distributedlog pull request #133: DL-124: Use Java8 Future rather ...
Github user jiazhai commented on a diff in the pull request: https://github.com/apache/incubator-distributedlog/pull/133#discussion_r119984432 --- Diff: distributedlog-core/src/main/java/org/apache/distributedlog/BKAbstractLogWriter.java --- @@ -357,80 +354,72 @@ private void truncateLogSegmentsIfNecessary(BKLogWriteHandler writeHandler) { // skip scheduling if there is task that's already running // synchronized (this) { -if (truncationEnabled && ((lastTruncationAttempt == null) || lastTruncationAttempt.isDefined())) { +if (truncationEnabled && ((lastTruncationAttempt == null) || lastTruncationAttempt.isDone())) { lastTruncationAttempt = writeHandler.purgeLogSegmentsOlderThanTimestamp(minTimestampToKeep); } } } -private Future asyncStartNewLogSegment(final BKLogWriteHandler writeHandler, +private CompletableFuture asyncStartNewLogSegment(final BKLogWriteHandler writeHandler, final long startTxId, --- End diff -- Please also do the code alignment change here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-distributedlog pull request #133: DL-124: Use Java8 Future rather ...
GitHub user sijie opened a pull request: https://github.com/apache/incubator-distributedlog/pull/133 DL-124: Use Java8 Future rather than twitter Future Switch to use Java8 CompletableFuture, to reduce dependencies introduced by twitter future and make it more friendly to users (users don't think of using which version of scala). This change is based on #132 . Gitsha ce0686e is the change to review. The changes: - Change Future to CompletableFuture - Map to thenApply - flatMap to thenCompose - Added a FutureEventListener, and switch addEvenListener to whenComplete (or whenCompleteAsync) - setValue to complete - setException to completeExceptionally - add rescue, ignore, ensure to FutureUtils as util functions. You can merge this pull request into a Git repository by running: $ git pull https://github.com/sijie/incubator-distributedlog change_twitter_future_to_java_future Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-distributedlog/pull/133.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #133 commit 54c2de047e1656e34ead7fb54070441afd9c140d Author: Sijie Guo Date: 2017-05-26T22:05:43Z Re-organize the distributedlog modules - move proxy related class from protocol to proxy-protocol, changed client and service to proxy-client and proxy-service commit 67e76150b103422f16883857a1c1345cf044fb45 Author: Sijie Guo Date: 2017-05-27T07:04:40Z Use integration for exception code rather than thrift generated StatusCode commit 6e587869f87cdce50ae93ba3d52767719d1ab5a6 Author: Sijie Guo Date: 2017-05-27T07:34:02Z Use the latest thrift version for distributedlog-core and remove scrooge commit ce0686e30e89c75ffce81473de5a0264d5d95f58 Author: Sijie Guo Date: 2017-05-29T23:06:19Z Change Twitter Future to Java8 CompletableFuture --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---