http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java index eedfbd6..26a4a76 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java @@ -21,39 +21,33 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Stopwatch; import com.google.common.base.Ticker; -import org.apache.distributedlog.exceptions.DLIllegalStateException; -import org.apache.distributedlog.exceptions.DLInterruptedException; -import org.apache.distributedlog.exceptions.EndOfStreamException; -import org.apache.distributedlog.exceptions.IdleReaderException; -import org.apache.distributedlog.exceptions.LogNotFoundException; -import org.apache.distributedlog.exceptions.ReadCancelledException; -import org.apache.distributedlog.exceptions.UnexpectedException; -import org.apache.distributedlog.util.OrderedScheduler; -import org.apache.distributedlog.util.Utils; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; -import com.twitter.util.Throw; - import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; - +import java.util.function.Function; import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.bookkeeper.versioning.Versioned; +import org.apache.distributedlog.api.AsyncLogReader; +import org.apache.distributedlog.exceptions.DLIllegalStateException; +import org.apache.distributedlog.exceptions.DLInterruptedException; +import org.apache.distributedlog.exceptions.EndOfStreamException; +import org.apache.distributedlog.exceptions.IdleReaderException; +import org.apache.distributedlog.exceptions.LogNotFoundException; +import org.apache.distributedlog.exceptions.ReadCancelledException; +import org.apache.distributedlog.exceptions.UnexpectedException; +import org.apache.distributedlog.common.concurrent.FutureEventListener; +import org.apache.distributedlog.common.concurrent.FutureUtils; +import org.apache.distributedlog.util.OrderedScheduler; +import org.apache.distributedlog.util.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Function1; -import scala.runtime.AbstractFunction1; -import scala.runtime.BoxedUnit; /** * BookKeeper based {@link AsyncLogReader} implementation. @@ -76,13 +70,8 @@ import scala.runtime.BoxedUnit; class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification { static final Logger LOG = LoggerFactory.getLogger(BKAsyncLogReader.class); - private static final Function1<List<LogRecordWithDLSN>, LogRecordWithDLSN> READ_NEXT_MAP_FUNCTION = - new AbstractFunction1<List<LogRecordWithDLSN>, LogRecordWithDLSN>() { - @Override - public LogRecordWithDLSN apply(List<LogRecordWithDLSN> records) { - return records.get(0); - } - }; + private static final Function<List<LogRecordWithDLSN>, LogRecordWithDLSN> READ_NEXT_MAP_FUNCTION = + records -> records.get(0); private final String streamName; protected final BKDistributedLogManager bkDistributedLogManager; @@ -104,7 +93,7 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification { // last process time private final Stopwatch lastProcessTime; - protected Promise<Void> closeFuture = null; + protected CompletableFuture<Void> closeFuture = null; private boolean lockStream = false; @@ -143,7 +132,7 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification { private final Stopwatch enqueueTime; private final int numEntries; private final List<LogRecordWithDLSN> records; - private final Promise<List<LogRecordWithDLSN>> promise; + private final CompletableFuture<List<LogRecordWithDLSN>> promise; private final long deadlineTime; private final TimeUnit deadlineTimeUnit; @@ -158,12 +147,12 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification { } else { this.records = new ArrayList<LogRecordWithDLSN>(); } - this.promise = new Promise<List<LogRecordWithDLSN>>(); + this.promise = new CompletableFuture<List<LogRecordWithDLSN>>(); this.deadlineTime = deadlineTime; this.deadlineTimeUnit = deadlineTimeUnit; } - Promise<List<LogRecordWithDLSN>> getPromise() { + CompletableFuture<List<LogRecordWithDLSN>> getPromise() { return promise; } @@ -171,9 +160,9 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification { return enqueueTime.elapsed(timeUnit); } - void setException(Throwable throwable) { + void completeExceptionally(Throwable throwable) { Stopwatch stopwatch = Stopwatch.createStarted(); - if (promise.updateIfEmpty(new Throw<List<LogRecordWithDLSN>>(throwable))) { + if (promise.completeExceptionally(throwable)) { futureSetLatency.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); delayUntilPromiseSatisfied.registerFailedEvent(enqueueTime.elapsed(TimeUnit.MICROSECONDS)); } @@ -204,7 +193,7 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification { } delayUntilPromiseSatisfied.registerSuccessfulEvent(enqueueTime.stop().elapsed(TimeUnit.MICROSECONDS)); Stopwatch stopwatch = Stopwatch.createStarted(); - promise.setValue(records); + promise.complete(records); futureSetLatency.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); } } @@ -333,7 +322,7 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification { return startDLSN; } - public Future<Void> lockStream() { + public CompletableFuture<Void> lockStream() { this.lockStream = true; return readHandler.lockStream(); } @@ -381,16 +370,16 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification { * @return A promise that when satisfied will contain the Log Record with its DLSN. */ @Override - public synchronized Future<LogRecordWithDLSN> readNext() { - return readInternal(1, 0, TimeUnit.MILLISECONDS).map(READ_NEXT_MAP_FUNCTION); + public synchronized CompletableFuture<LogRecordWithDLSN> readNext() { + return readInternal(1, 0, TimeUnit.MILLISECONDS).thenApply(READ_NEXT_MAP_FUNCTION); } - public synchronized Future<List<LogRecordWithDLSN>> readBulk(int numEntries) { + public synchronized CompletableFuture<List<LogRecordWithDLSN>> readBulk(int numEntries) { return readInternal(numEntries, 0, TimeUnit.MILLISECONDS); } @Override - public synchronized Future<List<LogRecordWithDLSN>> readBulk(int numEntries, + public synchronized CompletableFuture<List<LogRecordWithDLSN>> readBulk(int numEntries, long waitTime, TimeUnit timeUnit) { return readInternal(numEntries, waitTime, timeUnit); @@ -404,7 +393,7 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification { * num entries to read * @return A promise that satisfied with a non-empty list of log records with their DLSN. */ - private synchronized Future<List<LogRecordWithDLSN>> readInternal(int numEntries, + private synchronized CompletableFuture<List<LogRecordWithDLSN>> readInternal(int numEntries, long deadlineTime, TimeUnit deadlineTimeUnit) { timeBetweenReadNexts.registerSuccessfulEvent(readNextDelayStopwatch.elapsed(TimeUnit.MICROSECONDS)); @@ -421,19 +410,15 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification { bkDistributedLogManager.getScheduler(), Ticker.systemTicker(), bkDistributedLogManager.alertStatsLogger); - readHandler.checkLogStreamExists().addEventListener(new FutureEventListener<Void>() { + readHandler.checkLogStreamExists().whenComplete(new FutureEventListener<Void>() { @Override public void onSuccess(Void value) { try { readHandler.registerListener(readAheadEntryReader); readHandler.asyncStartFetchLogSegments() - .map(new AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() { - @Override - public BoxedUnit apply(Versioned<List<LogSegmentMetadata>> logSegments) { - readAheadEntryReader.addStateChangeNotification(BKAsyncLogReader.this); - readAheadEntryReader.start(logSegments.getValue()); - return BoxedUnit.UNIT; - } + .thenAccept(logSegments -> { + readAheadEntryReader.addStateChangeNotification(BKAsyncLogReader.this); + readAheadEntryReader.start(logSegments.getValue()); }); } catch (Exception exc) { notifyOnError(exc); @@ -448,7 +433,7 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification { } if (checkClosedOrInError("readNext")) { - readRequest.setException(lastException.get()); + readRequest.completeExceptionally(lastException.get()); } else { boolean queueEmpty = pendingRequests.isEmpty(); pendingRequests.add(readRequest); @@ -478,15 +463,15 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification { } @Override - public Future<Void> asyncClose() { + public CompletableFuture<Void> asyncClose() { // Cancel the idle reader timeout task, interrupting if necessary ReadCancelledException exception; - Promise<Void> closePromise; + CompletableFuture<Void> closePromise; synchronized (this) { if (null != closeFuture) { return closeFuture; } - closePromise = closeFuture = new Promise<Void>(); + closePromise = closeFuture = new CompletableFuture<Void>(); exception = new ReadCancelledException(readHandler.getFullyQualifiedName(), "Reader was closed"); setLastException(exception); } @@ -507,16 +492,18 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification { readHandler.unregisterListener(readAheadReader); readAheadReader.removeStateChangeNotification(this); } - Utils.closeSequence(bkDistributedLogManager.getScheduler(), true, - readAheadReader, - readHandler - ).proxyTo(closePromise); + FutureUtils.proxyTo( + Utils.closeSequence(bkDistributedLogManager.getScheduler(), true, + readAheadReader, + readHandler + ), + closePromise); return closePromise; } private void cancelAllPendingReads(Throwable throwExc) { for (PendingReadRequest promise : pendingRequests) { - promise.setException(throwExc); + promise.completeExceptionally(throwExc); } pendingRequests.clear(); } @@ -591,7 +578,8 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification { } if (disableProcessingReadRequests) { - LOG.info("Reader of {} is forced to stop processing read requests", readHandler.getFullyQualifiedName()); + LOG.info("Reader of {} is forced to stop processing read requests", + readHandler.getFullyQualifiedName()); return; } } @@ -601,9 +589,9 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification { // the reader in error and abort all pending reads since we dont // know the last consumed read if (null == lastException.get()) { - if (nextRequest.getPromise().isInterrupted().isDefined()) { - setLastException(new DLInterruptedException("Interrupted on reading " + readHandler.getFullyQualifiedName() + " : ", - nextRequest.getPromise().isInterrupted().get())); + if (nextRequest.getPromise().isCancelled()) { + setLastException(new DLInterruptedException("Interrupted on reading " + + readHandler.getFullyQualifiedName())); } } @@ -680,9 +668,9 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification { } else { DLIllegalStateException ise = new DLIllegalStateException("Unexpected condition at dlsn = " + nextRequest.records.get(0).getDlsn()); - nextRequest.setException(ise); + nextRequest.completeExceptionally(ise); if (null != request) { - request.setException(ise); + request.completeExceptionally(ise); } // We should never get here as we should have exited the loop if // pendingRequests were empty
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java index a1b1d5c..62b32f2 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java @@ -19,33 +19,30 @@ package org.apache.distributedlog; import com.google.common.base.Stopwatch; import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import org.apache.bookkeeper.feature.Feature; +import org.apache.bookkeeper.feature.FeatureProvider; +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.distributedlog.api.AsyncLogWriter; import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; import org.apache.distributedlog.exceptions.StreamNotReadyException; import org.apache.distributedlog.exceptions.WriteCancelledException; import org.apache.distributedlog.exceptions.WriteException; import org.apache.distributedlog.feature.CoreFeatureKeys; import org.apache.distributedlog.util.FailpointUtils; -import org.apache.distributedlog.util.FutureUtils; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; -import com.twitter.util.Try; -import org.apache.bookkeeper.feature.Feature; -import org.apache.bookkeeper.feature.FeatureProvider; -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.distributedlog.common.concurrent.FutureEventListener; +import org.apache.distributedlog.common.concurrent.FutureUtils; +import org.apache.distributedlog.util.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Function1; -import scala.Option; -import scala.runtime.AbstractFunction1; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; /** * BookKeeper based {@link AsyncLogWriter} implementation. @@ -70,35 +67,30 @@ class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWriter { static final Logger LOG = LoggerFactory.getLogger(BKAsyncLogWriter.class); - static Function1<List<LogSegmentMetadata>, Boolean> TruncationResultConverter = - new AbstractFunction1<List<LogSegmentMetadata>, Boolean>() { - @Override - public Boolean apply(List<LogSegmentMetadata> segments) { - return true; - } - }; + static Function<List<LogSegmentMetadata>, Boolean> TruncationResultConverter = + segments -> true; // Records pending for roll log segment. class PendingLogRecord implements FutureEventListener<DLSN> { final LogRecord record; - final Promise<DLSN> promise; + final CompletableFuture<DLSN> promise; final boolean flush; PendingLogRecord(LogRecord record, boolean flush) { this.record = record; - this.promise = new Promise<DLSN>(); + this.promise = new CompletableFuture<DLSN>(); this.flush = flush; } @Override public void onSuccess(DLSN value) { - promise.setValue(value); + promise.complete(value); } @Override public void onFailure(Throwable cause) { - promise.setException(cause); + promise.completeExceptionally(cause); encounteredError = true; } } @@ -135,7 +127,7 @@ class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWriter { private final boolean disableRollOnSegmentError; private LinkedList<PendingLogRecord> pendingRequests = null; private volatile boolean encounteredError = false; - private Promise<BKLogSegmentWriter> rollingFuture = null; + private CompletableFuture<BKLogSegmentWriter> rollingFuture = null; private long lastTxId = DistributedLogConstants.INVALID_TXID; private final StatsLogger statsLogger; @@ -186,7 +178,7 @@ class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWriter { * log record * @return future of the write */ - public Future<DLSN> writeControlRecord(final LogRecord record) { + public CompletableFuture<DLSN> writeControlRecord(final LogRecord record) { record.setControl(); return write(record); } @@ -206,7 +198,7 @@ class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWriter { } } - private Future<BKLogSegmentWriter> getLogSegmentWriter(long firstTxid, + private CompletableFuture<BKLogSegmentWriter> getLogSegmentWriter(long firstTxid, boolean bestEffort, boolean rollLog, boolean allowMaxTxID) { @@ -217,24 +209,20 @@ class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWriter { stopwatch); } - private Future<BKLogSegmentWriter> doGetLogSegmentWriter(final long firstTxid, + private CompletableFuture<BKLogSegmentWriter> doGetLogSegmentWriter(final long firstTxid, final boolean bestEffort, final boolean rollLog, final boolean allowMaxTxID) { if (encounteredError) { - return Future.exception(new WriteException(bkDistributedLogManager.getStreamName(), + return FutureUtils.exception(new WriteException(bkDistributedLogManager.getStreamName(), "writer has been closed due to error.")); } - Future<BKLogSegmentWriter> writerFuture = asyncGetLedgerWriter(!disableRollOnSegmentError); + CompletableFuture<BKLogSegmentWriter> writerFuture = asyncGetLedgerWriter(!disableRollOnSegmentError); if (null == writerFuture) { return rollLogSegmentIfNecessary(null, firstTxid, bestEffort, allowMaxTxID); } else if (rollLog) { - return writerFuture.flatMap(new AbstractFunction1<BKLogSegmentWriter, Future<BKLogSegmentWriter>>() { - @Override - public Future<BKLogSegmentWriter> apply(BKLogSegmentWriter writer) { - return rollLogSegmentIfNecessary(writer, firstTxid, bestEffort, allowMaxTxID); - } - }); + return writerFuture.thenCompose( + writer -> rollLogSegmentIfNecessary(writer, firstTxid, bestEffort, allowMaxTxID)); } else { return writerFuture; } @@ -244,20 +232,20 @@ class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWriter { * We write end of stream marker by writing a record with MAX_TXID, so we need to allow using * max txid when rolling for this case only. */ - private Future<BKLogSegmentWriter> getLogSegmentWriterForEndOfStream() { + private CompletableFuture<BKLogSegmentWriter> getLogSegmentWriterForEndOfStream() { return getLogSegmentWriter(DistributedLogConstants.MAX_TXID, false /* bestEffort */, false /* roll log */, true /* allow max txid */); } - private Future<BKLogSegmentWriter> getLogSegmentWriter(long firstTxid, + private CompletableFuture<BKLogSegmentWriter> getLogSegmentWriter(long firstTxid, boolean bestEffort, boolean rollLog) { return getLogSegmentWriter(firstTxid, bestEffort, rollLog, false /* allow max txid */); } - Future<DLSN> queueRequest(LogRecord record, boolean flush) { + CompletableFuture<DLSN> queueRequest(LogRecord record, boolean flush) { PendingLogRecord pendingLogRecord = new PendingLogRecord(record, flush); pendingRequests.add(pendingLogRecord); return pendingLogRecord.promise; @@ -276,25 +264,25 @@ class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWriter { void startQueueingRequests() { assert(null == pendingRequests && null == rollingFuture); pendingRequests = new LinkedList<PendingLogRecord>(); - rollingFuture = new Promise<BKLogSegmentWriter>(); + rollingFuture = new CompletableFuture<BKLogSegmentWriter>(); } // for ordering guarantee, we shouldn't send requests to next log segments until // previous log segment is done. - private synchronized Future<DLSN> asyncWrite(final LogRecord record, + private synchronized CompletableFuture<DLSN> asyncWrite(final LogRecord record, boolean flush) { // The passed in writer may be stale since we acquire the writer outside of sync // lock. If we recently rolled and the new writer is cached, use that instead. - Future<DLSN> result = null; + CompletableFuture<DLSN> result = null; BKLogSegmentWriter w; try { w = getCachedLogSegmentWriter(); } catch (WriteException we) { - return Future.exception(we); + return FutureUtils.exception(we); } if (null != rollingFuture) { if (streamFailFast) { - result = Future.exception(new StreamNotReadyException("Rolling log segment")); + result = FutureUtils.exception(new StreamNotReadyException("Rolling log segment")); } else { result = queueRequest(record, flush); } @@ -303,7 +291,7 @@ class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWriter { startQueueingRequests(); if (null != w) { LastPendingLogRecord lastLogRecordInCurrentSegment = new LastPendingLogRecord(record, flush); - w.asyncWrite(record, true).addEventListener(lastLogRecordInCurrentSegment); + w.asyncWrite(record, true).whenComplete(lastLogRecordInCurrentSegment); result = lastLogRecordInCurrentSegment.promise; } else { // no log segment yet. roll the log segment and issue pending requests. result = queueRequest(record, flush); @@ -314,26 +302,22 @@ class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWriter { } // use map here rather than onSuccess because we want lastTxId to be updated before // satisfying the future - return result.map(new AbstractFunction1<DLSN, DLSN>() { - @Override - public DLSN apply(DLSN dlsn) { - setLastTxId(record.getTransactionId()); - return dlsn; - } + return result.thenApply(dlsn -> { + setLastTxId(record.getTransactionId()); + return dlsn; }); } - private List<Future<DLSN>> asyncWriteBulk(List<LogRecord> records) { - final ArrayList<Future<DLSN>> results = new ArrayList<Future<DLSN>>(records.size()); + private List<CompletableFuture<DLSN>> asyncWriteBulk(List<LogRecord> records) { + final ArrayList<CompletableFuture<DLSN>> results = new ArrayList<CompletableFuture<DLSN>>(records.size()); Iterator<LogRecord> iterator = records.iterator(); while (iterator.hasNext()) { LogRecord record = iterator.next(); - Future<DLSN> future = asyncWrite(record, !iterator.hasNext()); + CompletableFuture<DLSN> future = asyncWrite(record, !iterator.hasNext()); results.add(future); // Abort early if an individual write has already failed. - Option<Try<DLSN>> result = future.poll(); - if (result.isDefined() && result.get().isThrow()) { + if (future.isDone() && future.isCompletedExceptionally()) { break; } } @@ -343,18 +327,18 @@ class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWriter { return results; } - private void appendCancelledFutures(List<Future<DLSN>> futures, int numToAdd) { + private void appendCancelledFutures(List<CompletableFuture<DLSN>> futures, int numToAdd) { final WriteCancelledException cre = new WriteCancelledException(getStreamName()); for (int i = 0; i < numToAdd; i++) { - Future<DLSN> cancelledFuture = Future.exception(cre); + CompletableFuture<DLSN> cancelledFuture = FutureUtils.exception(cre); futures.add(cancelledFuture); } } private void rollLogSegmentAndIssuePendingRequests(final long firstTxId) { getLogSegmentWriter(firstTxId, true, true) - .addEventListener(new FutureEventListener<BKLogSegmentWriter>() { + .whenComplete(new FutureEventListener<BKLogSegmentWriter>() { @Override public void onSuccess(BKLogSegmentWriter writer) { try { @@ -362,7 +346,7 @@ class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWriter { for (PendingLogRecord pendingLogRecord : pendingRequests) { FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_LogWriterIssuePending); writer.asyncWrite(pendingLogRecord.record, pendingLogRecord.flush) - .addEventListener(pendingLogRecord); + .whenComplete(pendingLogRecord); } // if there are no records in the pending queue, let's write a control record // so that when a new log segment is rolled, a control record will be added and @@ -373,10 +357,10 @@ class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWriter { controlRecord.setControl(); PendingLogRecord controlReq = new PendingLogRecord(controlRecord, false); writer.asyncWrite(controlReq.record, controlReq.flush) - .addEventListener(controlReq); + .whenComplete(controlReq); } if (null != rollingFuture) { - FutureUtils.setValue(rollingFuture, writer); + FutureUtils.complete(rollingFuture, writer); } rollingFuture = null; pendingRequestDispatch.add(pendingRequests.size()); @@ -401,7 +385,7 @@ class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWriter { encounteredError = errorOutWriter; pendingRequests = null; if (null != rollingFuture) { - FutureUtils.setException(rollingFuture, cause); + FutureUtils.completeExceptionally(rollingFuture, cause); } rollingFuture = null; } @@ -411,7 +395,7 @@ class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWriter { // After erroring out the writer above, no more requests // will be enqueued to pendingRequests for (PendingLogRecord pendingLogRecord : pendingRequestsSnapshot) { - pendingLogRecord.promise.setException(cause); + pendingLogRecord.promise.completeExceptionally(cause); } } @@ -425,7 +409,7 @@ class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWriter { * @param record single log record */ @Override - public Future<DLSN> write(final LogRecord record) { + public CompletableFuture<DLSN> write(final LogRecord record) { final Stopwatch stopwatch = Stopwatch.createStarted(); return FutureUtils.stats( asyncWrite(record, true), @@ -442,30 +426,30 @@ class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWriter { * @param records list of records */ @Override - public Future<List<Future<DLSN>>> writeBulk(final List<LogRecord> records) { + public CompletableFuture<List<CompletableFuture<DLSN>>> writeBulk(final List<LogRecord> records) { final Stopwatch stopwatch = Stopwatch.createStarted(); return FutureUtils.stats( - Future.value(asyncWriteBulk(records)), + FutureUtils.value(asyncWriteBulk(records)), bulkWriteOpStatsLogger, stopwatch); } @Override - public Future<Boolean> truncate(final DLSN dlsn) { + public CompletableFuture<Boolean> truncate(final DLSN dlsn) { if (DLSN.InvalidDLSN == dlsn) { - return Future.value(false); + return FutureUtils.value(false); } BKLogWriteHandler writeHandler; try { writeHandler = getWriteHandler(); } catch (IOException e) { - return Future.exception(e); + return FutureUtils.exception(e); } - return writeHandler.setLogSegmentsOlderThanDLSNTruncated(dlsn).map(TruncationResultConverter); + return writeHandler.setLogSegmentsOlderThanDLSNTruncated(dlsn).thenApply(TruncationResultConverter); } - Future<Long> flushAndCommit() { - Future<BKLogSegmentWriter> writerFuture; + CompletableFuture<Long> flushAndCommit() { + CompletableFuture<BKLogSegmentWriter> writerFuture; synchronized (this) { if (null != this.rollingFuture) { writerFuture = this.rollingFuture; @@ -474,19 +458,14 @@ class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWriter { } } if (null == writerFuture) { - return Future.value(getLastTxId()); + return FutureUtils.value(getLastTxId()); } - return writerFuture.flatMap(new AbstractFunction1<BKLogSegmentWriter, Future<Long>>() { - @Override - public Future<Long> apply(BKLogSegmentWriter writer) { - return writer.flushAndCommit(); - } - }); + return writerFuture.thenCompose(writer -> writer.flushAndCommit()); } - Future<Long> markEndOfStream() { + CompletableFuture<Long> markEndOfStream() { final Stopwatch stopwatch = Stopwatch.createStarted(); - Future<BKLogSegmentWriter> logSegmentWriterFuture; + CompletableFuture<BKLogSegmentWriter> logSegmentWriterFuture; synchronized (this) { logSegmentWriterFuture = this.rollingFuture; } @@ -495,19 +474,14 @@ class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWriter { } return FutureUtils.stats( - logSegmentWriterFuture.flatMap(new AbstractFunction1<BKLogSegmentWriter, Future<Long>>() { - @Override - public Future<Long> apply(BKLogSegmentWriter w) { - return w.markEndOfStream(); - } - }), + logSegmentWriterFuture.thenCompose(w -> w.markEndOfStream()), markEndOfStreamOpStatsLogger, stopwatch); } @Override - protected Future<Void> asyncCloseAndComplete() { - Future<BKLogSegmentWriter> logSegmentWriterFuture; + protected CompletableFuture<Void> asyncCloseAndComplete() { + CompletableFuture<BKLogSegmentWriter> logSegmentWriterFuture; synchronized (this) { logSegmentWriterFuture = this.rollingFuture; } @@ -515,18 +489,13 @@ class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWriter { if (null == logSegmentWriterFuture) { return super.asyncCloseAndComplete(); } else { - return logSegmentWriterFuture.flatMap(new AbstractFunction1<BKLogSegmentWriter, Future<Void>>() { - @Override - public Future<Void> apply(BKLogSegmentWriter segmentWriter) { - return BKAsyncLogWriter.super.asyncCloseAndComplete(); - } - }); + return logSegmentWriterFuture.thenCompose(segmentWriter1 -> super.asyncCloseAndComplete()); } } @Override void closeAndComplete() throws IOException { - FutureUtils.result(asyncCloseAndComplete()); + Utils.ioResult(asyncCloseAndComplete()); } /** @@ -539,12 +508,12 @@ class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWriter { } @Override - public Future<Void> asyncAbort() { - Future<Void> result = super.asyncAbort(); + public CompletableFuture<Void> asyncAbort() { + CompletableFuture<Void> result = super.asyncAbort(); synchronized (this) { if (pendingRequests != null) { for (PendingLogRecord pendingLogRecord : pendingRequests) { - pendingLogRecord.promise.setException(new WriteException(bkDistributedLogManager.getStreamName(), + pendingLogRecord.promise.completeExceptionally(new WriteException(bkDistributedLogManager.getStreamName(), "abort wring: writer has been closed due to error.")); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java index 00e6b5c..cffe500 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java @@ -20,14 +20,22 @@ package org.apache.distributedlog; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import org.apache.bookkeeper.feature.FeatureProvider; +import org.apache.bookkeeper.stats.AlertStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.distributedlog.api.AsyncLogReader; +import org.apache.distributedlog.api.AsyncLogWriter; +import org.apache.distributedlog.api.DistributedLogManager; +import org.apache.distributedlog.api.LogReader; import org.apache.distributedlog.callback.LogSegmentListener; import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; import org.apache.distributedlog.exceptions.AlreadyClosedException; import org.apache.distributedlog.exceptions.LogEmptyException; import org.apache.distributedlog.exceptions.LogNotFoundException; import org.apache.distributedlog.exceptions.UnexpectedException; -import org.apache.distributedlog.function.CloseAsyncCloseableFunction; -import org.apache.distributedlog.function.GetVersionedValueFunction; import org.apache.distributedlog.injector.AsyncFailureInjector; import org.apache.distributedlog.logsegment.LogSegmentEntryStore; import org.apache.distributedlog.logsegment.LogSegmentEntryWriter; @@ -41,40 +49,25 @@ import org.apache.distributedlog.logsegment.LogSegmentFilter; import org.apache.distributedlog.logsegment.LogSegmentMetadataCache; import org.apache.distributedlog.metadata.LogStreamMetadataStore; import org.apache.distributedlog.namespace.NamespaceDriver; -import org.apache.distributedlog.stats.BroadCastStatsLogger; -import org.apache.distributedlog.subscription.SubscriptionsStore; +import org.apache.distributedlog.common.stats.BroadCastStatsLogger; +import org.apache.distributedlog.api.subscription.SubscriptionsStore; import org.apache.distributedlog.util.Allocator; import org.apache.distributedlog.util.DLUtils; -import org.apache.distributedlog.util.FutureUtils; -import org.apache.distributedlog.util.MonitoredFuturePool; +import org.apache.distributedlog.common.concurrent.FutureEventListener; +import org.apache.distributedlog.common.concurrent.FutureUtils; import org.apache.distributedlog.util.OrderedScheduler; -import org.apache.distributedlog.util.PermitLimiter; -import org.apache.distributedlog.util.PermitManager; -import org.apache.distributedlog.util.SchedulerUtils; +import org.apache.distributedlog.common.util.PermitLimiter; +import org.apache.distributedlog.common.util.PermitManager; import org.apache.distributedlog.util.Utils; -import com.twitter.util.ExceptionalFunction; -import com.twitter.util.ExceptionalFunction0; -import com.twitter.util.Function; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; -import org.apache.bookkeeper.feature.FeatureProvider; -import org.apache.bookkeeper.stats.AlertStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction0; -import scala.runtime.AbstractFunction1; -import scala.runtime.BoxedUnit; -import java.io.Closeable; import java.io.IOException; import java.net.URI; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; import static org.apache.distributedlog.namespace.NamespaceDriver.Role.READER; import static org.apache.distributedlog.namespace.NamespaceDriver.Role.WRITER; @@ -104,20 +97,10 @@ class BKDistributedLogManager implements DistributedLogManager { static final Logger LOG = LoggerFactory.getLogger(BKDistributedLogManager.class); static final Function<LogRecordWithDLSN, Long> RECORD_2_TXID_FUNCTION = - new Function<LogRecordWithDLSN, Long>() { - @Override - public Long apply(LogRecordWithDLSN record) { - return record.getTransactionId(); - } - }; + record -> record.getTransactionId(); static final Function<LogRecordWithDLSN, DLSN> RECORD_2_DLSN_FUNCTION = - new Function<LogRecordWithDLSN, DLSN>() { - @Override - public DLSN apply(LogRecordWithDLSN record) { - return record.getDlsn(); - } - }; + record -> record.getDlsn(); private final URI uri; private final String name; @@ -127,7 +110,7 @@ class BKDistributedLogManager implements DistributedLogManager { private final DistributedLogConfiguration conf; private final DynamicDistributedLogConfiguration dynConf; private final NamespaceDriver driver; - private Promise<Void> closePromise; + private CompletableFuture<Void> closePromise; private final OrderedScheduler scheduler; private final FeatureProvider featureProvider; private final AsyncFailureInjector failureInjector; @@ -272,17 +255,18 @@ class BKDistributedLogManager implements DistributedLogManager { @Override public List<LogSegmentMetadata> getLogSegments() throws IOException { - return FutureUtils.result(getLogSegmentsAsync()); + return Utils.ioResult(getLogSegmentsAsync()); } - protected Future<List<LogSegmentMetadata>> getLogSegmentsAsync() { + protected CompletableFuture<List<LogSegmentMetadata>> getLogSegmentsAsync() { final BKLogReadHandler readHandler = createReadHandler(); return readHandler.readLogSegmentsFromStore( LogSegmentMetadata.COMPARATOR, LogSegmentFilter.DEFAULT_FILTER, - null) - .map(GetVersionedValueFunction.GET_LOGSEGMENT_LIST_FUNC) - .ensure(CloseAsyncCloseableFunction.of(readHandler)); + null + ) + .thenApply((versionedList) -> versionedList.getValue()) + .whenComplete((value, cause) -> readHandler.asyncClose()); } @Override @@ -353,29 +337,26 @@ class BKDistributedLogManager implements DistributedLogManager { public BKLogWriteHandler createWriteHandler(boolean lockHandler) throws IOException { - return FutureUtils.result(asyncCreateWriteHandler(lockHandler)); + return Utils.ioResult(asyncCreateWriteHandler(lockHandler)); } - Future<BKLogWriteHandler> asyncCreateWriteHandler(final boolean lockHandler) { + CompletableFuture<BKLogWriteHandler> asyncCreateWriteHandler(final boolean lockHandler) { // Fetching Log Metadata (create if not exists) return driver.getLogStreamMetadataStore(WRITER).getLog( uri, name, true, conf.getCreateStreamIfNotExists() - ).flatMap(new AbstractFunction1<LogMetadataForWriter, Future<BKLogWriteHandler>>() { - @Override - public Future<BKLogWriteHandler> apply(LogMetadataForWriter logMetadata) { - Promise<BKLogWriteHandler> createPromise = new Promise<BKLogWriteHandler>(); - createWriteHandler(logMetadata, lockHandler, createPromise); - return createPromise; - } + ).thenCompose(logMetadata -> { + CompletableFuture<BKLogWriteHandler> createPromise = new CompletableFuture<BKLogWriteHandler>(); + createWriteHandler(logMetadata, lockHandler, createPromise); + return createPromise; }); } private void createWriteHandler(LogMetadataForWriter logMetadata, boolean lockHandler, - final Promise<BKLogWriteHandler> createPromise) { + final CompletableFuture<BKLogWriteHandler> createPromise) { // Build the locks DistributedLock lock; if (conf.isWriteLockEnabled()) { @@ -389,7 +370,7 @@ class BKDistributedLogManager implements DistributedLogManager { segmentAllocator = driver.getLogSegmentEntryStore(WRITER) .newLogSegmentAllocator(logMetadata, dynConf); } catch (IOException ioe) { - FutureUtils.setException(createPromise, ioe); + FutureUtils.completeExceptionally(createPromise, ioe); return; } @@ -412,25 +393,21 @@ class BKDistributedLogManager implements DistributedLogManager { dynConf, lock); if (lockHandler) { - writeHandler.lockHandler().addEventListener(new FutureEventListener<DistributedLock>() { + writeHandler.lockHandler().whenComplete(new FutureEventListener<DistributedLock>() { @Override public void onSuccess(DistributedLock lock) { - FutureUtils.setValue(createPromise, writeHandler); + FutureUtils.complete(createPromise, writeHandler); } @Override public void onFailure(final Throwable cause) { - writeHandler.asyncClose().ensure(new AbstractFunction0<BoxedUnit>() { - @Override - public BoxedUnit apply() { - FutureUtils.setException(createPromise, cause); - return BoxedUnit.UNIT; - } - }); + FutureUtils.ensure( + writeHandler.asyncClose(), + () -> FutureUtils.completeExceptionally(createPromise, cause)); } }); } else { - FutureUtils.setValue(createPromise, writeHandler); + FutureUtils.complete(createPromise, writeHandler); } } @@ -438,18 +415,15 @@ class BKDistributedLogManager implements DistributedLogManager { return driver.getLogStreamMetadataStore(WRITER).getPermitManager(); } - <T> Future<T> processReaderOperation(final Function<BKLogReadHandler, Future<T>> func) { - return scheduler.apply(new ExceptionalFunction0<BKLogReadHandler>() { - @Override - public BKLogReadHandler applyE() throws Throwable { - return getReadHandlerAndRegisterListener(true, null); - } - }).flatMap(new ExceptionalFunction<BKLogReadHandler, Future<T>>() { - @Override - public Future<T> applyE(final BKLogReadHandler readHandler) throws Throwable { - return func.apply(readHandler); - } + <T> CompletableFuture<T> processReaderOperation(final Function<BKLogReadHandler, CompletableFuture<T>> func) { + CompletableFuture<T> future = FutureUtils.createFuture(); + scheduler.submit(() -> { + BKLogReadHandler readHandler = getReadHandlerAndRegisterListener(true, null); + FutureUtils.proxyTo( + func.apply(readHandler), + future); }); + return future; } /** @@ -461,7 +435,7 @@ class BKDistributedLogManager implements DistributedLogManager { @Override public boolean isEndOfStreamMarked() throws IOException { checkClosedOrInError("isEndOfStreamMarked"); - long lastTxId = FutureUtils.result(getLastLogRecordAsyncInternal(false, true)).getTransactionId(); + long lastTxId = Utils.ioResult(getLastLogRecordAsyncInternal(false, true)).getTransactionId(); return lastTxId == DistributedLogConstants.MAX_TXID; } @@ -473,7 +447,7 @@ class BKDistributedLogManager implements DistributedLogManager { public AppendOnlyStreamWriter getAppendOnlyStreamWriter() throws IOException { long position; try { - position = FutureUtils.result(getLastLogRecordAsyncInternal(true, false)).getTransactionId(); + position = Utils.ioResult(getLastLogRecordAsyncInternal(true, false)).getTransactionId(); if (DistributedLogConstants.INVALID_TXID == position || DistributedLogConstants.EMPTY_LOGSEGMENT_TX_ID == position) { position = 0; @@ -508,7 +482,7 @@ class BKDistributedLogManager implements DistributedLogManager { try { writer.createAndCacheWriteHandler(); BKLogWriteHandler writeHandler = writer.getWriteHandler(); - FutureUtils.result(writeHandler.lockHandler()); + Utils.ioResult(writeHandler.lockHandler()); success = true; return writer; } finally { @@ -525,75 +499,63 @@ class BKDistributedLogManager implements DistributedLogManager { */ @Override public BKAsyncLogWriter startAsyncLogSegmentNonPartitioned() throws IOException { - return (BKAsyncLogWriter) FutureUtils.result(openAsyncLogWriter()); + return (BKAsyncLogWriter) Utils.ioResult(openAsyncLogWriter()); } @Override - public Future<AsyncLogWriter> openAsyncLogWriter() { + public CompletableFuture<AsyncLogWriter> openAsyncLogWriter() { try { checkClosedOrInError("startLogSegmentNonPartitioned"); } catch (AlreadyClosedException e) { - return Future.exception(e); + return FutureUtils.exception(e); } - Future<BKLogWriteHandler> createWriteHandleFuture; + CompletableFuture<BKLogWriteHandler> createWriteHandleFuture; synchronized (this) { // 1. create the locked write handler createWriteHandleFuture = asyncCreateWriteHandler(true); } - return createWriteHandleFuture.flatMap(new AbstractFunction1<BKLogWriteHandler, Future<AsyncLogWriter>>() { - @Override - public Future<AsyncLogWriter> apply(final BKLogWriteHandler writeHandler) { - final BKAsyncLogWriter writer; - synchronized (BKDistributedLogManager.this) { - // 2. create the writer with the handler - writer = new BKAsyncLogWriter( - conf, - dynConf, - BKDistributedLogManager.this, - writeHandler, - featureProvider, - statsLogger); - } - // 3. recover the incomplete log segments - return writeHandler.recoverIncompleteLogSegments() - .map(new AbstractFunction1<Long, AsyncLogWriter>() { - @Override - public AsyncLogWriter apply(Long lastTxId) { - // 4. update last tx id if successfully recovered - writer.setLastTxId(lastTxId); - return writer; - } - }).onFailure(new AbstractFunction1<Throwable, BoxedUnit>() { - @Override - public BoxedUnit apply(Throwable cause) { - // 5. close the writer if recovery failed - writer.asyncAbort(); - return BoxedUnit.UNIT; - } - }); + return createWriteHandleFuture.thenCompose(writeHandler -> { + final BKAsyncLogWriter writer; + synchronized (BKDistributedLogManager.this) { + // 2. create the writer with the handler + writer = new BKAsyncLogWriter( + conf, + dynConf, + BKDistributedLogManager.this, + writeHandler, + featureProvider, + statsLogger); } + // 3. recover the incomplete log segments + return writeHandler.recoverIncompleteLogSegments() + .thenApply(lastTxId -> { + // 4. update last tx id if successfully recovered + writer.setLastTxId(lastTxId); + return (AsyncLogWriter) writer; + }) + .whenComplete((lastTxId, cause) -> { + if (null != cause) { + // 5. close the writer if recovery failed + writer.asyncAbort(); + } + }); }); } @Override - public Future<DLSN> getDLSNNotLessThanTxId(final long fromTxnId) { - return getLogSegmentsAsync().flatMap(new AbstractFunction1<List<LogSegmentMetadata>, Future<DLSN>>() { - @Override - public Future<DLSN> apply(List<LogSegmentMetadata> segments) { - return getDLSNNotLessThanTxId(fromTxnId, segments); - } - }); + public CompletableFuture<DLSN> getDLSNNotLessThanTxId(final long fromTxnId) { + return getLogSegmentsAsync().thenCompose(segments -> getDLSNNotLessThanTxId(fromTxnId, segments)); } - private Future<DLSN> getDLSNNotLessThanTxId(long fromTxnId, + private CompletableFuture<DLSN> getDLSNNotLessThanTxId(long fromTxnId, final List<LogSegmentMetadata> segments) { if (segments.isEmpty()) { return getLastDLSNAsync(); } final int segmentIdx = DLUtils.findLogSegmentNotLessThanTxnId(segments, fromTxnId); if (segmentIdx < 0) { - return Future.value(new DLSN(segments.get(0).getLogSegmentSequenceNumber(), 0L, 0L)); + return FutureUtils.value(new DLSN(segments.get(0).getLogSegmentSequenceNumber(), 0L, 0L)); } return getDLSNNotLessThanTxIdInSegment( fromTxnId, @@ -603,7 +565,7 @@ class BKDistributedLogManager implements DistributedLogManager { ); } - private Future<DLSN> getDLSNNotLessThanTxIdInSegment(final long fromTxnId, + private CompletableFuture<DLSN> getDLSNNotLessThanTxIdInSegment(final long fromTxnId, final int segmentIdx, final List<LogSegmentMetadata> segments, final LogSegmentEntryStore entryStore) { @@ -615,29 +577,23 @@ class BKDistributedLogManager implements DistributedLogManager { scheduler, entryStore, Math.max(2, dynConf.getReadAheadBatchSize()) - ).flatMap(new AbstractFunction1<Optional<LogRecordWithDLSN>, Future<DLSN>>() { - @Override - public Future<DLSN> apply(Optional<LogRecordWithDLSN> foundRecord) { - if (foundRecord.isPresent()) { - return Future.value(foundRecord.get().getDlsn()); - } - if ((segments.size() - 1) == segmentIdx) { - return getLastLogRecordAsync().map(new AbstractFunction1<LogRecordWithDLSN, DLSN>() { - @Override - public DLSN apply(LogRecordWithDLSN record) { - if (record.getTransactionId() >= fromTxnId) { - return record.getDlsn(); - } - return record.getDlsn().getNextDLSN(); - } - }); - } else { - return getDLSNNotLessThanTxIdInSegment( - fromTxnId, - segmentIdx + 1, - segments, - entryStore); - } + ).thenCompose(foundRecord -> { + if (foundRecord.isPresent()) { + return FutureUtils.value(foundRecord.get().getDlsn()); + } + if ((segments.size() - 1) == segmentIdx) { + return getLastLogRecordAsync().thenApply(record -> { + if (record.getTransactionId() >= fromTxnId) { + return record.getDlsn(); + } + return record.getDlsn().getNextDLSN(); + }); + } else { + return getDLSNNotLessThanTxIdInSegment( + fromTxnId, + segmentIdx + 1, + segments, + entryStore); } }); } @@ -662,7 +618,7 @@ class BKDistributedLogManager implements DistributedLogManager { @Override public AsyncLogReader getAsyncLogReader(long fromTxnId) throws IOException { - return FutureUtils.result(openAsyncLogReader(fromTxnId)); + return Utils.ioResult(openAsyncLogReader(fromTxnId)); } /** @@ -687,39 +643,34 @@ class BKDistributedLogManager implements DistributedLogManager { * @return future representing the open result. */ @Override - public Future<AsyncLogReader> openAsyncLogReader(long fromTxnId) { - final Promise<DLSN> dlsnPromise = new Promise<DLSN>(); - getDLSNNotLessThanTxId(fromTxnId).addEventListener(new FutureEventListener<DLSN>() { + public CompletableFuture<AsyncLogReader> openAsyncLogReader(long fromTxnId) { + final CompletableFuture<DLSN> dlsnPromise = new CompletableFuture<DLSN>(); + getDLSNNotLessThanTxId(fromTxnId).whenComplete(new FutureEventListener<DLSN>() { @Override public void onSuccess(DLSN dlsn) { - dlsnPromise.setValue(dlsn); + dlsnPromise.complete(dlsn); } @Override public void onFailure(Throwable cause) { if (cause instanceof LogEmptyException) { - dlsnPromise.setValue(DLSN.InitialDLSN); + dlsnPromise.complete(DLSN.InitialDLSN); } else { - dlsnPromise.setException(cause); + dlsnPromise.completeExceptionally(cause); } } }); - return dlsnPromise.flatMap(new AbstractFunction1<DLSN, Future<AsyncLogReader>>() { - @Override - public Future<AsyncLogReader> apply(DLSN dlsn) { - return openAsyncLogReader(dlsn); - } - }); + return dlsnPromise.thenCompose(dlsn -> openAsyncLogReader(dlsn)); } @Override public AsyncLogReader getAsyncLogReader(DLSN fromDLSN) throws IOException { - return FutureUtils.result(openAsyncLogReader(fromDLSN)); + return Utils.ioResult(openAsyncLogReader(fromDLSN)); } @Override - public Future<AsyncLogReader> openAsyncLogReader(DLSN fromDLSN) { + public CompletableFuture<AsyncLogReader> openAsyncLogReader(DLSN fromDLSN) { Optional<String> subscriberId = Optional.absent(); AsyncLogReader reader = new BKAsyncLogReader( this, @@ -729,7 +680,7 @@ class BKDistributedLogManager implements DistributedLogManager { false, statsLogger); pendingReaders.add(reader); - return Future.value(reader); + return FutureUtils.value(reader); } /** @@ -738,26 +689,26 @@ class BKDistributedLogManager implements DistributedLogManager { * blocked. */ @Override - public Future<AsyncLogReader> getAsyncLogReaderWithLock(final DLSN fromDLSN) { + public CompletableFuture<AsyncLogReader> getAsyncLogReaderWithLock(final DLSN fromDLSN) { Optional<String> subscriberId = Optional.absent(); return getAsyncLogReaderWithLock(Optional.of(fromDLSN), subscriberId); } @Override - public Future<AsyncLogReader> getAsyncLogReaderWithLock(final DLSN fromDLSN, final String subscriberId) { + public CompletableFuture<AsyncLogReader> getAsyncLogReaderWithLock(final DLSN fromDLSN, final String subscriberId) { return getAsyncLogReaderWithLock(Optional.of(fromDLSN), Optional.of(subscriberId)); } @Override - public Future<AsyncLogReader> getAsyncLogReaderWithLock(String subscriberId) { + public CompletableFuture<AsyncLogReader> getAsyncLogReaderWithLock(String subscriberId) { Optional<DLSN> fromDLSN = Optional.absent(); return getAsyncLogReaderWithLock(fromDLSN, Optional.of(subscriberId)); } - protected Future<AsyncLogReader> getAsyncLogReaderWithLock(final Optional<DLSN> fromDLSN, + protected CompletableFuture<AsyncLogReader> getAsyncLogReaderWithLock(final Optional<DLSN> fromDLSN, final Optional<String> subscriberId) { if (!fromDLSN.isPresent() && !subscriberId.isPresent()) { - return Future.exception(new UnexpectedException("Neither from dlsn nor subscriber id is provided.")); + return FutureUtils.exception(new UnexpectedException("Neither from dlsn nor subscriber id is provided.")); } final BKAsyncLogReader reader = new BKAsyncLogReader( BKDistributedLogManager.this, @@ -767,55 +718,50 @@ class BKDistributedLogManager implements DistributedLogManager { false, statsLogger); pendingReaders.add(reader); - final Future<Void> lockFuture = reader.lockStream(); - final Promise<AsyncLogReader> createPromise = new Promise<AsyncLogReader>( - new Function<Throwable, BoxedUnit>() { - @Override - public BoxedUnit apply(Throwable cause) { + final CompletableFuture<Void> lockFuture = reader.lockStream(); + final CompletableFuture<AsyncLogReader> createPromise = FutureUtils.createFuture(); + createPromise.whenComplete((value, cause) -> { + if (cause instanceof CancellationException) { // cancel the lock when the creation future is cancelled - lockFuture.cancel(); - return BoxedUnit.UNIT; + lockFuture.cancel(true); } }); // lock the stream - fetch the last commit position on success - lockFuture.flatMap(new Function<Void, Future<AsyncLogReader>>() { + lockFuture.thenCompose(new Function<Void, CompletableFuture<AsyncLogReader>>() { @Override - public Future<AsyncLogReader> apply(Void complete) { + public CompletableFuture<AsyncLogReader> apply(Void complete) { if (fromDLSN.isPresent()) { - return Future.value((AsyncLogReader) reader); + return FutureUtils.value(reader); } LOG.info("Reader {} @ {} reading last commit position from subscription store after acquired lock.", subscriberId.get(), name); // we acquired lock final SubscriptionsStore subscriptionsStore = driver.getSubscriptionsStore(getStreamName()); return subscriptionsStore.getLastCommitPosition(subscriberId.get()) - .map(new ExceptionalFunction<DLSN, AsyncLogReader>() { - @Override - public AsyncLogReader applyE(DLSN lastCommitPosition) throws UnexpectedException { - LOG.info("Reader {} @ {} positioned to last commit position {}.", - new Object[] { subscriberId.get(), name, lastCommitPosition }); - reader.setStartDLSN(lastCommitPosition); - return reader; - } - }); + .thenCompose(lastCommitPosition -> { + LOG.info("Reader {} @ {} positioned to last commit position {}.", + new Object[] { subscriberId.get(), name, lastCommitPosition }); + try { + reader.setStartDLSN(lastCommitPosition); + } catch (UnexpectedException e) { + return FutureUtils.exception(e); + } + return FutureUtils.value(reader); + }); } - }).addEventListener(new FutureEventListener<AsyncLogReader>() { + }).whenComplete(new FutureEventListener<AsyncLogReader>() { @Override public void onSuccess(AsyncLogReader r) { pendingReaders.remove(reader); - FutureUtils.setValue(createPromise, r); + FutureUtils.complete(createPromise, r); } @Override public void onFailure(final Throwable cause) { pendingReaders.remove(reader); - reader.asyncClose().ensure(new AbstractFunction0<BoxedUnit>() { - @Override - public BoxedUnit apply() { - FutureUtils.setException(createPromise, cause); - return BoxedUnit.UNIT; - } - }); + FutureUtils.ensure( + reader.asyncClose(), + () -> FutureUtils.completeExceptionally(createPromise, cause)); } }); return createPromise; @@ -833,7 +779,7 @@ class BKDistributedLogManager implements DistributedLogManager { throws IOException { DLSN fromDLSN; try { - fromDLSN = FutureUtils.result(getDLSNNotLessThanTxId(fromTxnId)); + fromDLSN = Utils.ioResult(getDLSNNotLessThanTxId(fromTxnId)); } catch (LogEmptyException lee) { fromDLSN = DLSN.InitialDLSN; } @@ -861,25 +807,25 @@ class BKDistributedLogManager implements DistributedLogManager { @Override public LogRecordWithDLSN getLastLogRecord() throws IOException { checkClosedOrInError("getLastLogRecord"); - return FutureUtils.result(getLastLogRecordAsync()); + return Utils.ioResult(getLastLogRecordAsync()); } @Override public long getFirstTxId() throws IOException { checkClosedOrInError("getFirstTxId"); - return FutureUtils.result(getFirstRecordAsyncInternal()).getTransactionId(); + return Utils.ioResult(getFirstRecordAsyncInternal()).getTransactionId(); } @Override public long getLastTxId() throws IOException { checkClosedOrInError("getLastTxId"); - return FutureUtils.result(getLastTxIdAsync()); + return Utils.ioResult(getLastTxIdAsync()); } @Override public DLSN getLastDLSN() throws IOException { checkClosedOrInError("getLastDLSN"); - return FutureUtils.result(getLastLogRecordAsyncInternal(false, false)).getDlsn(); + return Utils.ioResult(getLastLogRecordAsyncInternal(false, false)).getDlsn(); } /** @@ -888,15 +834,15 @@ class BKDistributedLogManager implements DistributedLogManager { * @return latest log record */ @Override - public Future<LogRecordWithDLSN> getLastLogRecordAsync() { + public CompletableFuture<LogRecordWithDLSN> getLastLogRecordAsync() { return getLastLogRecordAsyncInternal(false, false); } - private Future<LogRecordWithDLSN> getLastLogRecordAsyncInternal(final boolean recover, + private CompletableFuture<LogRecordWithDLSN> getLastLogRecordAsyncInternal(final boolean recover, final boolean includeEndOfStream) { - return processReaderOperation(new Function<BKLogReadHandler, Future<LogRecordWithDLSN>>() { + return processReaderOperation(new Function<BKLogReadHandler, CompletableFuture<LogRecordWithDLSN>>() { @Override - public Future<LogRecordWithDLSN> apply(final BKLogReadHandler ledgerHandler) { + public CompletableFuture<LogRecordWithDLSN> apply(final BKLogReadHandler ledgerHandler) { return ledgerHandler.getLastLogRecordAsync(recover, includeEndOfStream); } }); @@ -908,9 +854,9 @@ class BKDistributedLogManager implements DistributedLogManager { * @return latest transaction id */ @Override - public Future<Long> getLastTxIdAsync() { + public CompletableFuture<Long> getLastTxIdAsync() { return getLastLogRecordAsyncInternal(false, false) - .map(RECORD_2_TXID_FUNCTION); + .thenApply(RECORD_2_TXID_FUNCTION); } /** @@ -919,14 +865,14 @@ class BKDistributedLogManager implements DistributedLogManager { * @return first dlsn in the stream */ @Override - public Future<DLSN> getFirstDLSNAsync() { - return getFirstRecordAsyncInternal().map(RECORD_2_DLSN_FUNCTION); + public CompletableFuture<DLSN> getFirstDLSNAsync() { + return getFirstRecordAsyncInternal().thenApply(RECORD_2_DLSN_FUNCTION); } - private Future<LogRecordWithDLSN> getFirstRecordAsyncInternal() { - return processReaderOperation(new Function<BKLogReadHandler, Future<LogRecordWithDLSN>>() { + private CompletableFuture<LogRecordWithDLSN> getFirstRecordAsyncInternal() { + return processReaderOperation(new Function<BKLogReadHandler, CompletableFuture<LogRecordWithDLSN>>() { @Override - public Future<LogRecordWithDLSN> apply(final BKLogReadHandler ledgerHandler) { + public CompletableFuture<LogRecordWithDLSN> apply(final BKLogReadHandler ledgerHandler) { return ledgerHandler.asyncGetFirstLogRecord(); } }); @@ -938,9 +884,9 @@ class BKDistributedLogManager implements DistributedLogManager { * @return latest transaction id */ @Override - public Future<DLSN> getLastDLSNAsync() { + public CompletableFuture<DLSN> getLastDLSNAsync() { return getLastLogRecordAsyncInternal(false, false) - .map(RECORD_2_DLSN_FUNCTION); + .thenApply(RECORD_2_DLSN_FUNCTION); } /** @@ -953,7 +899,7 @@ class BKDistributedLogManager implements DistributedLogManager { @Override public long getLogRecordCount() throws IOException { checkClosedOrInError("getLogRecordCount"); - return FutureUtils.result(getLogRecordCountAsync(DLSN.InitialDLSN)); + return Utils.ioResult(getLogRecordCountAsync(DLSN.InitialDLSN)); } /** @@ -964,10 +910,10 @@ class BKDistributedLogManager implements DistributedLogManager { * @throws IOException */ @Override - public Future<Long> getLogRecordCountAsync(final DLSN beginDLSN) { - return processReaderOperation(new Function<BKLogReadHandler, Future<Long>>() { + public CompletableFuture<Long> getLogRecordCountAsync(final DLSN beginDLSN) { + return processReaderOperation(new Function<BKLogReadHandler, CompletableFuture<Long>>() { @Override - public Future<Long> apply(BKLogReadHandler ledgerHandler) { + public CompletableFuture<Long> apply(BKLogReadHandler ledgerHandler) { return ledgerHandler.asyncGetLogRecordCount(beginDLSN); } }); @@ -991,7 +937,7 @@ class BKDistributedLogManager implements DistributedLogManager { checkClosedOrInError("recoverInternal"); BKLogWriteHandler ledgerHandler = createWriteHandler(true); try { - FutureUtils.result(ledgerHandler.recoverIncompleteLogSegments()); + Utils.ioResult(ledgerHandler.recoverIncompleteLogSegments()); } finally { Utils.closeQuietly(ledgerHandler); } @@ -1004,7 +950,7 @@ class BKDistributedLogManager implements DistributedLogManager { */ @Override public void delete() throws IOException { - FutureUtils.result(driver.getLogStreamMetadataStore(WRITER) + Utils.ioResult(driver.getLogStreamMetadataStore(WRITER) .deleteLog(uri, getStreamName())); } @@ -1025,7 +971,7 @@ class BKDistributedLogManager implements DistributedLogManager { BKLogWriteHandler ledgerHandler = createWriteHandler(true); try { LOG.info("Purging logs for {} older than {}", ledgerHandler.getFullyQualifiedName(), minTxIdToKeep); - FutureUtils.result(ledgerHandler.purgeLogSegmentsOlderThanTxnId(minTxIdToKeep)); + Utils.ioResult(ledgerHandler.purgeLogSegmentsOlderThanTxnId(minTxIdToKeep)); } finally { Utils.closeQuietly(ledgerHandler); } @@ -1049,14 +995,11 @@ class BKDistributedLogManager implements DistributedLogManager { } @Override - public Future<Void> asyncClose() { + public CompletableFuture<Void> asyncClose() { return Utils.closeSequence(executorService, true, readers.toArray(new AsyncLogReader[readers.size()])) - .onSuccess(new AbstractFunction1<Void, BoxedUnit>() { - @Override - public BoxedUnit apply(Void value) { - readers.clear(); - return BoxedUnit.UNIT; - } + .thenApply(value -> { + readers.clear(); + return null; }); } }; @@ -1065,28 +1008,28 @@ class BKDistributedLogManager implements DistributedLogManager { * Close the distributed log manager, freeing any resources it may hold. */ @Override - public Future<Void> asyncClose() { - Promise<Void> closeFuture; + public CompletableFuture<Void> asyncClose() { + CompletableFuture<Void> closeFuture; BKLogReadHandler readHandlerToClose; synchronized (this) { if (null != closePromise) { return closePromise; } - closeFuture = closePromise = new Promise<Void>(); + closeFuture = closePromise = new CompletableFuture<Void>(); readHandlerToClose = readHandlerForListener; } - Future<Void> closeResult = Utils.closeSequence(null, true, + CompletableFuture<Void> closeResult = Utils.closeSequence(null, true, readHandlerToClose, pendingReaders, resourcesCloseable.or(AsyncCloseable.NULL)); - closeResult.proxyTo(closeFuture); + FutureUtils.proxyTo(closeResult, closeFuture); return closeFuture; } @Override public void close() throws IOException { - FutureUtils.result(asyncClose()); + Utils.ioResult(asyncClose()); } @Override http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java index adb591f..60ad916 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java @@ -20,6 +20,8 @@ package org.apache.distributedlog; import com.google.common.base.Optional; import com.google.common.base.Ticker; import org.apache.distributedlog.acl.AccessControlManager; +import org.apache.distributedlog.api.DistributedLogManager; +import org.apache.distributedlog.api.namespace.Namespace; import org.apache.distributedlog.callback.NamespaceListener; import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; import org.apache.distributedlog.exceptions.AlreadyClosedException; @@ -28,14 +30,11 @@ import org.apache.distributedlog.exceptions.LogNotFoundException; import org.apache.distributedlog.injector.AsyncFailureInjector; import org.apache.distributedlog.io.AsyncCloseable; import org.apache.distributedlog.logsegment.LogSegmentMetadataCache; -import org.apache.distributedlog.namespace.DistributedLogNamespace; import org.apache.distributedlog.namespace.NamespaceDriver; import org.apache.distributedlog.util.ConfUtils; -import org.apache.distributedlog.util.FutureUtils; -import org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor; import org.apache.distributedlog.util.OrderedScheduler; -import org.apache.distributedlog.util.PermitLimiter; -import org.apache.distributedlog.util.SchedulerUtils; +import org.apache.distributedlog.common.util.PermitLimiter; +import org.apache.distributedlog.common.util.SchedulerUtils; import org.apache.distributedlog.util.Utils; import org.apache.bookkeeper.feature.FeatureProvider; import org.apache.bookkeeper.stats.StatsLogger; @@ -52,7 +51,7 @@ import static org.apache.distributedlog.namespace.NamespaceDriver.Role.WRITER; import static org.apache.distributedlog.util.DLUtils.validateAndNormalizeName; /** - * BKDistributedLogNamespace is the default implementation of {@link DistributedLogNamespace}. It uses + * BKDistributedLogNamespace is the default implementation of {@link Namespace}. It uses * zookeeper for metadata storage and bookkeeper for data storage. * <h3>Metrics</h3> * @@ -74,8 +73,6 @@ import static org.apache.distributedlog.util.DLUtils.validateAndNormalizeName; * <ul> * <li> `scope`/factory/thread_pool/* : stats about the ordered scheduler used by this namespace. * See {@link OrderedScheduler}. - * <li> `scope`/factory/readahead_thread_pool/* : stats about the readahead thread pool executor - * used by this namespace. See {@link MonitoredScheduledThreadPoolExecutor}. * <li> `scope`/writeLimiter/* : stats about the global write limiter used by this namespace. * See {@link PermitLimiter}. * </ul> @@ -84,7 +81,7 @@ import static org.apache.distributedlog.util.DLUtils.validateAndNormalizeName; * * All the core stats about reader and writer are exposed under current scope via {@link BKDistributedLogManager}. */ -public class BKDistributedLogNamespace implements DistributedLogNamespace { +public class BKDistributedLogNamespace implements Namespace { static final Logger LOG = LoggerFactory.getLogger(BKDistributedLogNamespace.class); private final String clientId; @@ -149,8 +146,8 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { throws InvalidStreamNameException, IOException { checkState(); logName = validateAndNormalizeName(logName); - URI uri = FutureUtils.result(driver.getLogMetadataStore().createLog(logName)); - FutureUtils.result(driver.getLogStreamMetadataStore(WRITER).getLog(uri, logName, true, true)); + URI uri = Utils.ioResult(driver.getLogMetadataStore().createLog(logName)); + Utils.ioResult(driver.getLogStreamMetadataStore(WRITER).getLog(uri, logName, true, true)); } @Override @@ -158,7 +155,7 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { throws InvalidStreamNameException, LogNotFoundException, IOException { checkState(); logName = validateAndNormalizeName(logName); - Optional<URI> uri = FutureUtils.result(driver.getLogMetadataStore().getLogLocation(logName)); + Optional<URI> uri = Utils.ioResult(driver.getLogMetadataStore().getLogLocation(logName)); if (!uri.isPresent()) { throw new LogNotFoundException("Log " + logName + " isn't found."); } @@ -187,7 +184,7 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { throws InvalidStreamNameException, IOException { checkState(); logName = validateAndNormalizeName(logName); - Optional<URI> uri = FutureUtils.result(driver.getLogMetadataStore().getLogLocation(logName)); + Optional<URI> uri = Utils.ioResult(driver.getLogMetadataStore().getLogLocation(logName)); if (!uri.isPresent()) { throw new LogNotFoundException("Log " + logName + " isn't found."); } @@ -202,10 +199,10 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { public boolean logExists(String logName) throws IOException, IllegalArgumentException { checkState(); - Optional<URI> uri = FutureUtils.result(driver.getLogMetadataStore().getLogLocation(logName)); + Optional<URI> uri = Utils.ioResult(driver.getLogMetadataStore().getLogLocation(logName)); if (uri.isPresent()) { try { - FutureUtils.result(driver.getLogStreamMetadataStore(WRITER) + Utils.ioResult(driver.getLogStreamMetadataStore(WRITER) .logExists(uri.get(), logName)); return true; } catch (LogNotFoundException lnfe) { @@ -219,7 +216,7 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { @Override public Iterator<String> getLogs() throws IOException { checkState(); - return FutureUtils.result(driver.getLogMetadataStore().getLogs()); + return Utils.ioResult(driver.getLogMetadataStore().getLogs()); } @Override