Github user jiazhai commented on a diff in the pull request:

    
https://github.com/apache/incubator-distributedlog/pull/133#discussion_r119984594
  
    --- Diff: 
distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
 ---
    @@ -525,75 +495,63 @@ public BKSyncLogWriter 
startLogSegmentNonPartitioned() throws IOException {
          */
         @Override
         public BKAsyncLogWriter startAsyncLogSegmentNonPartitioned() throws 
IOException {
    -        return (BKAsyncLogWriter) FutureUtils.result(openAsyncLogWriter());
    +        return (BKAsyncLogWriter) Utils.ioResult(openAsyncLogWriter());
         }
     
         @Override
    -    public Future<AsyncLogWriter> openAsyncLogWriter() {
    +    public CompletableFuture<AsyncLogWriter> openAsyncLogWriter() {
             try {
                 checkClosedOrInError("startLogSegmentNonPartitioned");
             } catch (AlreadyClosedException e) {
    -            return Future.exception(e);
    +            return FutureUtils.exception(e);
             }
     
    -        Future<BKLogWriteHandler> createWriteHandleFuture;
    +        CompletableFuture<BKLogWriteHandler> createWriteHandleFuture;
             synchronized (this) {
                 // 1. create the locked write handler
                 createWriteHandleFuture = asyncCreateWriteHandler(true);
             }
    -        return createWriteHandleFuture.flatMap(new 
AbstractFunction1<BKLogWriteHandler, Future<AsyncLogWriter>>() {
    -            @Override
    -            public Future<AsyncLogWriter> apply(final BKLogWriteHandler 
writeHandler) {
    -                final BKAsyncLogWriter writer;
    -                synchronized (BKDistributedLogManager.this) {
    -                    // 2. create the writer with the handler
    -                    writer = new BKAsyncLogWriter(
    -                            conf,
    -                            dynConf,
    -                            BKDistributedLogManager.this,
    -                            writeHandler,
    -                            featureProvider,
    -                            statsLogger);
    -                }
    -                // 3. recover the incomplete log segments
    -                return writeHandler.recoverIncompleteLogSegments()
    -                        .map(new AbstractFunction1<Long, AsyncLogWriter>() 
{
    -                            @Override
    -                            public AsyncLogWriter apply(Long lastTxId) {
    -                                // 4. update last tx id if successfully 
recovered
    -                                writer.setLastTxId(lastTxId);
    -                                return writer;
    -                            }
    -                        }).onFailure(new AbstractFunction1<Throwable, 
BoxedUnit>() {
    -                            @Override
    -                            public BoxedUnit apply(Throwable cause) {
    -                                // 5. close the writer if recovery failed
    -                                writer.asyncAbort();
    -                                return BoxedUnit.UNIT;
    -                            }
    -                        });
    +        return createWriteHandleFuture.thenCompose(writeHandler -> {
    +            final BKAsyncLogWriter writer;
    +            synchronized (BKDistributedLogManager.this) {
    +                // 2. create the writer with the handler
    +                writer = new BKAsyncLogWriter(
    +                        conf,
    +                        dynConf,
    +                        BKDistributedLogManager.this,
    +                        writeHandler,
    +                        featureProvider,
    +                        statsLogger);
                 }
    +            // 3. recover the incomplete log segments
    +            return writeHandler.recoverIncompleteLogSegments()
    +                .thenApply(lastTxId -> {
    +                    // 4. update last tx id if successfully recovered
    +                    writer.setLastTxId(lastTxId);
    +                    return (AsyncLogWriter) writer;
    +                })
    +                .whenComplete((lastTxId, cause) -> {
    +                    if (null != cause) {
    +                        // 5. close the writer if recovery failed
    +                        writer.asyncAbort();
    +                    }
    +                });
             });
         }
     
         @Override
    -    public Future<DLSN> getDLSNNotLessThanTxId(final long fromTxnId) {
    -        return getLogSegmentsAsync().flatMap(new 
AbstractFunction1<List<LogSegmentMetadata>, Future<DLSN>>() {
    -            @Override
    -            public Future<DLSN> apply(List<LogSegmentMetadata> segments) {
    -                return getDLSNNotLessThanTxId(fromTxnId, segments);
    -            }
    -        });
    +    public CompletableFuture<DLSN> getDLSNNotLessThanTxId(final long 
fromTxnId) {
    +        return getLogSegmentsAsync().thenCompose(segments -> 
getDLSNNotLessThanTxId(fromTxnId, segments));
         }
     
    -    private Future<DLSN> getDLSNNotLessThanTxId(long fromTxnId,
    +    private CompletableFuture<DLSN> getDLSNNotLessThanTxId(long fromTxnId,
                                                     final 
List<LogSegmentMetadata> segments) {
    --- End diff --
    
    Please also do the code alignment change in this file, also at line 565, 
707, 838.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to