Github user jiazhai commented on a diff in the pull request: https://github.com/apache/incubator-distributedlog/pull/133#discussion_r119984594 --- Diff: distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java --- @@ -525,75 +495,63 @@ public BKSyncLogWriter startLogSegmentNonPartitioned() throws IOException { */ @Override public BKAsyncLogWriter startAsyncLogSegmentNonPartitioned() throws IOException { - return (BKAsyncLogWriter) FutureUtils.result(openAsyncLogWriter()); + return (BKAsyncLogWriter) Utils.ioResult(openAsyncLogWriter()); } @Override - public Future<AsyncLogWriter> openAsyncLogWriter() { + public CompletableFuture<AsyncLogWriter> openAsyncLogWriter() { try { checkClosedOrInError("startLogSegmentNonPartitioned"); } catch (AlreadyClosedException e) { - return Future.exception(e); + return FutureUtils.exception(e); } - Future<BKLogWriteHandler> createWriteHandleFuture; + CompletableFuture<BKLogWriteHandler> createWriteHandleFuture; synchronized (this) { // 1. create the locked write handler createWriteHandleFuture = asyncCreateWriteHandler(true); } - return createWriteHandleFuture.flatMap(new AbstractFunction1<BKLogWriteHandler, Future<AsyncLogWriter>>() { - @Override - public Future<AsyncLogWriter> apply(final BKLogWriteHandler writeHandler) { - final BKAsyncLogWriter writer; - synchronized (BKDistributedLogManager.this) { - // 2. create the writer with the handler - writer = new BKAsyncLogWriter( - conf, - dynConf, - BKDistributedLogManager.this, - writeHandler, - featureProvider, - statsLogger); - } - // 3. recover the incomplete log segments - return writeHandler.recoverIncompleteLogSegments() - .map(new AbstractFunction1<Long, AsyncLogWriter>() { - @Override - public AsyncLogWriter apply(Long lastTxId) { - // 4. update last tx id if successfully recovered - writer.setLastTxId(lastTxId); - return writer; - } - }).onFailure(new AbstractFunction1<Throwable, BoxedUnit>() { - @Override - public BoxedUnit apply(Throwable cause) { - // 5. close the writer if recovery failed - writer.asyncAbort(); - return BoxedUnit.UNIT; - } - }); + return createWriteHandleFuture.thenCompose(writeHandler -> { + final BKAsyncLogWriter writer; + synchronized (BKDistributedLogManager.this) { + // 2. create the writer with the handler + writer = new BKAsyncLogWriter( + conf, + dynConf, + BKDistributedLogManager.this, + writeHandler, + featureProvider, + statsLogger); } + // 3. recover the incomplete log segments + return writeHandler.recoverIncompleteLogSegments() + .thenApply(lastTxId -> { + // 4. update last tx id if successfully recovered + writer.setLastTxId(lastTxId); + return (AsyncLogWriter) writer; + }) + .whenComplete((lastTxId, cause) -> { + if (null != cause) { + // 5. close the writer if recovery failed + writer.asyncAbort(); + } + }); }); } @Override - public Future<DLSN> getDLSNNotLessThanTxId(final long fromTxnId) { - return getLogSegmentsAsync().flatMap(new AbstractFunction1<List<LogSegmentMetadata>, Future<DLSN>>() { - @Override - public Future<DLSN> apply(List<LogSegmentMetadata> segments) { - return getDLSNNotLessThanTxId(fromTxnId, segments); - } - }); + public CompletableFuture<DLSN> getDLSNNotLessThanTxId(final long fromTxnId) { + return getLogSegmentsAsync().thenCompose(segments -> getDLSNNotLessThanTxId(fromTxnId, segments)); } - private Future<DLSN> getDLSNNotLessThanTxId(long fromTxnId, + private CompletableFuture<DLSN> getDLSNNotLessThanTxId(long fromTxnId, final List<LogSegmentMetadata> segments) { --- End diff -- Please also do the code alignment change in this file, also at line 565, 707, 838.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---