http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java index fdb29f3..1293d00 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java @@ -17,9 +17,25 @@ */ package org.apache.distributedlog; +import static org.apache.distributedlog.impl.ZKLogSegmentFilters.WRITE_HANDLE_FILTER; + import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import org.apache.bookkeeper.feature.FeatureProvider; +import org.apache.bookkeeper.stats.AlertStatsLogger; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.versioning.Version; +import org.apache.bookkeeper.versioning.Versioned; import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; import org.apache.distributedlog.exceptions.DLIllegalStateException; import org.apache.distributedlog.exceptions.EndOfStreamException; @@ -43,34 +59,14 @@ import org.apache.distributedlog.metadata.LogSegmentMetadataStoreUpdater; import org.apache.distributedlog.util.Allocator; import org.apache.distributedlog.util.DLUtils; import org.apache.distributedlog.util.FailpointUtils; -import org.apache.distributedlog.util.FutureUtils; -import org.apache.distributedlog.util.FutureUtils.FutureEventListenerRunnable; +import org.apache.distributedlog.common.concurrent.FutureEventListener; +import org.apache.distributedlog.common.concurrent.FutureUtils; import org.apache.distributedlog.util.OrderedScheduler; import org.apache.distributedlog.util.Transaction; -import org.apache.distributedlog.util.PermitLimiter; +import org.apache.distributedlog.common.util.PermitLimiter; import org.apache.distributedlog.util.Utils; -import com.twitter.util.Function; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; -import org.apache.bookkeeper.feature.FeatureProvider; -import org.apache.bookkeeper.stats.AlertStatsLogger; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.bookkeeper.versioning.Version; -import org.apache.bookkeeper.versioning.Versioned; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction1; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import static org.apache.distributedlog.impl.ZKLogSegmentFilters.WRITE_HANDLE_FILTER; /** * Log Handler for Writers. @@ -108,7 +104,7 @@ class BKLogWriteHandler extends BKLogHandler { protected final boolean validateLogSegmentSequenceNumber; protected final int regionId; protected final RollingPolicy rollingPolicy; - protected Future<? extends DistributedLock> lockFuture = null; + protected CompletableFuture<? extends DistributedLock> lockFuture = null; protected final PermitLimiter writeLimiter; protected final FeatureProvider featureProvider; protected final DynamicDistributedLogConfiguration dynConf; @@ -117,16 +113,16 @@ class BKLogWriteHandler extends BKLogHandler { protected final LinkedList<Long> inprogressLSSNs; // Fetch LogSegments State: write can continue without full list of log segments while truncation needs - private final Future<Versioned<List<LogSegmentMetadata>>> fetchForWrite; - private Future<Versioned<List<LogSegmentMetadata>>> fetchForTruncation; + private final CompletableFuture<Versioned<List<LogSegmentMetadata>>> fetchForWrite; + private CompletableFuture<Versioned<List<LogSegmentMetadata>>> fetchForTruncation; // Recover Functions private final RecoverLogSegmentFunction recoverLogSegmentFunction = new RecoverLogSegmentFunction(); - private final AbstractFunction1<List<LogSegmentMetadata>, Future<Long>> recoverLogSegmentsFunction = - new AbstractFunction1<List<LogSegmentMetadata>, Future<Long>>() { + private final Function<List<LogSegmentMetadata>, CompletableFuture<Long>> recoverLogSegmentsFunction = + new Function<List<LogSegmentMetadata>, CompletableFuture<Long>>() { @Override - public Future<Long> apply(List<LogSegmentMetadata> segmentList) { + public CompletableFuture<Long> apply(List<LogSegmentMetadata> segmentList) { LOG.info("Initiating Recovery For {} : {}", getFullyQualifiedName(), segmentList); // if lastLedgerRollingTimeMillis is not updated, we set it to now. synchronized (BKLogWriteHandler.this) { @@ -145,8 +141,11 @@ class BKLogWriteHandler extends BKLogHandler { } } - return FutureUtils.processList(segmentList, recoverLogSegmentFunction, scheduler).map( - GetLastTxIdFunction.INSTANCE); + return FutureUtils.processList( + segmentList, + recoverLogSegmentFunction, + scheduler + ).thenApply(GetLastTxIdFunction.INSTANCE); } }; @@ -232,30 +231,30 @@ class BKLogWriteHandler extends BKLogHandler { deleteOpStats = segmentsStatsLogger.getOpStatsLogger("delete"); } - private Future<List<LogSegmentMetadata>> getCachedLogSegmentsAfterFirstFetch( + private CompletableFuture<List<LogSegmentMetadata>> getCachedLogSegmentsAfterFirstFetch( final Comparator<LogSegmentMetadata> comparator) { - final Promise<List<LogSegmentMetadata>> promise = new Promise<List<LogSegmentMetadata>>(); - fetchForWrite.addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { + final CompletableFuture<List<LogSegmentMetadata>> promise = new CompletableFuture<List<LogSegmentMetadata>>(); + fetchForWrite.whenComplete(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { @Override public void onFailure(Throwable cause) { - FutureUtils.setException(promise, cause); + promise.completeExceptionally(cause); } @Override public void onSuccess(Versioned<List<LogSegmentMetadata>> result) { try { - FutureUtils.setValue(promise, getCachedLogSegments(comparator)); + promise.complete(getCachedLogSegments(comparator)); } catch (UnexpectedException e) { - FutureUtils.setException(promise, e); + promise.completeExceptionally(e); } } }); return promise; } - private Future<List<LogSegmentMetadata>> getCachedLogSegmentsAfterFirstFullFetch( + private CompletableFuture<List<LogSegmentMetadata>> getCachedLogSegmentsAfterFirstFullFetch( final Comparator<LogSegmentMetadata> comparator) { - Future<Versioned<List<LogSegmentMetadata>>> result; + CompletableFuture<Versioned<List<LogSegmentMetadata>>> result; synchronized (this) { if (null == fetchForTruncation) { fetchForTruncation = readLogSegmentsFromStore( @@ -266,19 +265,19 @@ class BKLogWriteHandler extends BKLogHandler { result = fetchForTruncation; } - final Promise<List<LogSegmentMetadata>> promise = new Promise<List<LogSegmentMetadata>>(); - result.addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { + final CompletableFuture<List<LogSegmentMetadata>> promise = new CompletableFuture<List<LogSegmentMetadata>>(); + result.whenComplete(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { @Override public void onFailure(Throwable cause) { - FutureUtils.setException(promise, cause); + FutureUtils.completeExceptionally(promise, cause); } @Override public void onSuccess(Versioned<List<LogSegmentMetadata>> result) { try { - FutureUtils.setValue(promise, getCachedLogSegments(comparator)); + FutureUtils.complete(promise, getCachedLogSegments(comparator)); } catch (UnexpectedException e) { - FutureUtils.setException(promise, e); + FutureUtils.completeExceptionally(promise, e); } } }); @@ -374,7 +373,7 @@ class BKLogWriteHandler extends BKLogHandler { * * @return future represents the lock result */ - Future<? extends DistributedLock> lockHandler() { + CompletableFuture<? extends DistributedLock> lockHandler() { if (null != lockFuture) { return lockFuture; } @@ -382,11 +381,11 @@ class BKLogWriteHandler extends BKLogHandler { return lockFuture; } - Future<Void> unlockHandler() { + CompletableFuture<Void> unlockHandler() { if (null != lockFuture) { return lock.asyncClose(); } else { - return Future.Void(); + return FutureUtils.Void(); } } @@ -483,23 +482,23 @@ class BKLogWriteHandler extends BKLogHandler { } protected BKLogSegmentWriter doStartLogSegment(long txId, boolean bestEffort, boolean allowMaxTxID) throws IOException { - return FutureUtils.result(asyncStartLogSegment(txId, bestEffort, allowMaxTxID)); + return Utils.ioResult(asyncStartLogSegment(txId, bestEffort, allowMaxTxID)); } - protected Future<BKLogSegmentWriter> asyncStartLogSegment(final long txId, + protected CompletableFuture<BKLogSegmentWriter> asyncStartLogSegment(final long txId, final boolean bestEffort, final boolean allowMaxTxID) { - final Promise<BKLogSegmentWriter> promise = new Promise<BKLogSegmentWriter>(); + final CompletableFuture<BKLogSegmentWriter> promise = new CompletableFuture<BKLogSegmentWriter>(); try { lock.checkOwnershipAndReacquire(); } catch (LockingException e) { - FutureUtils.setException(promise, e); + FutureUtils.completeExceptionally(promise, e); return promise; } - fetchForWrite.addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { + fetchForWrite.whenComplete(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { @Override public void onFailure(Throwable cause) { - FutureUtils.setException(promise, cause); + FutureUtils.completeExceptionally(promise, cause); } @Override @@ -513,11 +512,11 @@ class BKLogWriteHandler extends BKLogHandler { protected void doStartLogSegment(final long txId, final boolean bestEffort, final boolean allowMaxTxID, - final Promise<BKLogSegmentWriter> promise) { + final CompletableFuture<BKLogSegmentWriter> promise) { // validate the tx id if ((txId < 0) || (!allowMaxTxID && (txId == DistributedLogConstants.MAX_TXID))) { - FutureUtils.setException(promise, new IOException("Invalid Transaction Id " + txId)); + FutureUtils.completeExceptionally(promise, new IOException("Invalid Transaction Id " + txId)); return; } @@ -525,11 +524,11 @@ class BKLogWriteHandler extends BKLogHandler { if (txId < highestTxIdWritten) { if (highestTxIdWritten == DistributedLogConstants.MAX_TXID) { LOG.error("We've already marked the stream as ended and attempting to start a new log segment"); - FutureUtils.setException(promise, new EndOfStreamException("Writing to a stream after it has been marked as completed")); + FutureUtils.completeExceptionally(promise, new EndOfStreamException("Writing to a stream after it has been marked as completed")); return; } else { LOG.error("We've already seen TxId {} the max TXId is {}", txId, highestTxIdWritten); - FutureUtils.setException(promise, new TransactionIdOutOfOrderException(txId, highestTxIdWritten)); + FutureUtils.completeExceptionally(promise, new TransactionIdOutOfOrderException(txId, highestTxIdWritten)); return; } } @@ -554,7 +553,7 @@ class BKLogWriteHandler extends BKLogHandler { } logSegmentAllocator.tryObtain(txn, NULL_OP_LISTENER) - .addEventListener(new FutureEventListener<LogSegmentEntryWriter>() { + .whenComplete(new FutureEventListener<LogSegmentEntryWriter>() { @Override public void onSuccess(LogSegmentEntryWriter entryWriter) { @@ -574,13 +573,13 @@ class BKLogWriteHandler extends BKLogHandler { }); } - private void failStartLogSegment(Promise<BKLogSegmentWriter> promise, + private void failStartLogSegment(CompletableFuture<BKLogSegmentWriter> promise, boolean bestEffort, Throwable cause) { if (bestEffort) { - FutureUtils.setValue(promise, null); + FutureUtils.complete(promise, null); } else { - FutureUtils.setException(promise, cause); + FutureUtils.completeExceptionally(promise, cause); } } @@ -591,7 +590,7 @@ class BKLogWriteHandler extends BKLogHandler { final long txId, final LogSegmentEntryWriter entryWriter, boolean bestEffort, - final Promise<BKLogSegmentWriter> promise) { + final CompletableFuture<BKLogSegmentWriter> promise) { final long logSegmentSeqNo; try { FailpointUtils.checkFailPoint( @@ -626,12 +625,12 @@ class BKLogWriteHandler extends BKLogHandler { LOG.debug("Try storing MaxTxId in startLogSegment {} {}", inprogressZnodePath, txId); storeMaxTxId(txn, maxTxId, txId); - txn.execute().addEventListener(FutureEventListenerRunnable.of(new FutureEventListener<Void>() { + txn.execute().whenCompleteAsync(new FutureEventListener<Void>() { @Override public void onSuccess(Void value) { try { - FutureUtils.setValue(promise, new BKLogSegmentWriter( + FutureUtils.complete(promise, new BKLogSegmentWriter( getFullyQualifiedName(), l.getSegmentName(), conf, @@ -656,7 +655,7 @@ class BKLogWriteHandler extends BKLogHandler { public void onFailure(Throwable cause) { failStartLogSegment(promise, false, cause); } - }, scheduler)); + }, scheduler); } boolean shouldStartNewSegment(BKLogSegmentWriter writer) { @@ -672,21 +671,21 @@ class BKLogWriteHandler extends BKLogHandler { * the firstTxId of the ledger matches firstTxId for the segment we are * trying to finalize. */ - Future<LogSegmentMetadata> completeAndCloseLogSegment(final BKLogSegmentWriter writer) { - final Promise<LogSegmentMetadata> promise = new Promise<LogSegmentMetadata>(); + CompletableFuture<LogSegmentMetadata> completeAndCloseLogSegment(final BKLogSegmentWriter writer) { + final CompletableFuture<LogSegmentMetadata> promise = new CompletableFuture<LogSegmentMetadata>(); completeAndCloseLogSegment(writer, promise); return promise; } private void completeAndCloseLogSegment(final BKLogSegmentWriter writer, - final Promise<LogSegmentMetadata> promise) { - writer.asyncClose().addEventListener(new FutureEventListener<Void>() { + final CompletableFuture<LogSegmentMetadata> promise) { + writer.asyncClose().whenComplete(new FutureEventListener<Void>() { @Override public void onSuccess(Void value) { // in theory closeToFinalize should throw exception if a stream is in error. // just in case, add another checking here to make sure we don't close log segment is a stream is in error. if (writer.shouldFailCompleteLogSegment()) { - FutureUtils.setException(promise, + FutureUtils.completeExceptionally(promise, new IOException("LogSegmentWriter for " + writer.getFullyQualifiedLogSegment() + " is already in error.")); return; } @@ -704,7 +703,7 @@ class BKLogWriteHandler extends BKLogHandler { @Override public void onFailure(Throwable cause) { - FutureUtils.setException(promise, cause); + FutureUtils.completeExceptionally(promise, cause); } }); } @@ -791,7 +790,7 @@ class BKLogWriteHandler extends BKLogHandler { int recordCount, long lastEntryId, long lastSlotId) throws IOException { - Promise<LogSegmentMetadata> promise = new Promise<LogSegmentMetadata>(); + CompletableFuture<LogSegmentMetadata> promise = new CompletableFuture<LogSegmentMetadata>(); doCompleteAndCloseLogSegment( inprogressZnodeName, logSegmentSeqNo, @@ -802,7 +801,7 @@ class BKLogWriteHandler extends BKLogHandler { lastEntryId, lastSlotId, promise); - return FutureUtils.result(promise); + return Utils.ioResult(promise); } protected void doCompleteAndCloseLogSegment(final String inprogressZnodeName, @@ -813,11 +812,11 @@ class BKLogWriteHandler extends BKLogHandler { final int recordCount, final long lastEntryId, final long lastSlotId, - final Promise<LogSegmentMetadata> promise) { - fetchForWrite.addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { + final CompletableFuture<LogSegmentMetadata> promise) { + fetchForWrite.whenComplete(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { @Override public void onFailure(Throwable cause) { - FutureUtils.setException(promise, cause); + FutureUtils.completeExceptionally(promise, cause); } @Override @@ -845,11 +844,11 @@ class BKLogWriteHandler extends BKLogHandler { int recordCount, long lastEntryId, long lastSlotId, - final Promise<LogSegmentMetadata> promise) { + final CompletableFuture<LogSegmentMetadata> promise) { try { lock.checkOwnershipAndReacquire(); } catch (IOException ioe) { - FutureUtils.setException(promise, ioe); + FutureUtils.completeExceptionally(promise, ioe); return; } @@ -858,7 +857,7 @@ class BKLogWriteHandler extends BKLogHandler { // validate log segment if (inprogressLogSegment.getLogSegmentId() != logSegmentId) { - FutureUtils.setException(promise, new IOException( + FutureUtils.completeExceptionally(promise, new IOException( "Active ledger has different ID to inprogress. " + inprogressLogSegment.getLogSegmentId() + " found, " + logSegmentId + " expected")); @@ -866,7 +865,7 @@ class BKLogWriteHandler extends BKLogHandler { } // validate the transaction id if (inprogressLogSegment.getFirstTxId() != firstTxId) { - FutureUtils.setException(promise, new IOException("Transaction id not as expected, " + FutureUtils.completeExceptionally(promise, new IOException("Transaction id not as expected, " + inprogressLogSegment.getFirstTxId() + " found, " + firstTxId + " expected")); return; } @@ -874,7 +873,7 @@ class BKLogWriteHandler extends BKLogHandler { if (validateLogSegmentSequenceNumber) { synchronized (inprogressLSSNs) { if (inprogressLSSNs.isEmpty()) { - FutureUtils.setException(promise, new UnexpectedException( + FutureUtils.completeExceptionally(promise, new UnexpectedException( "Didn't find matched inprogress log segments when completing inprogress " + inprogressLogSegment)); return; @@ -886,7 +885,7 @@ class BKLogWriteHandler extends BKLogHandler { // it should also be same as the least inprogress log segment sequence number tracked in {@link inprogressLSSNs} if ((inprogressLogSegment.getLogSegmentSequenceNumber() != logSegmentSeqNo) || (leastInprogressLSSN != logSegmentSeqNo)) { - FutureUtils.setException(promise, new UnexpectedException( + FutureUtils.completeExceptionally(promise, new UnexpectedException( "Didn't find matched inprogress log segments when completing inprogress " + inprogressLogSegment)); return; @@ -906,7 +905,7 @@ class BKLogWriteHandler extends BKLogHandler { LOG.warn("Unexpected max ledger sequence number {} found while completing log segment {} for {}", new Object[] { maxLogSegmentSequenceNo.getSequenceNumber(), logSegmentSeqNo, getFullyQualifiedName() }); if (validateLogSegmentSequenceNumber) { - FutureUtils.setException(promise, new DLIllegalStateException("Unexpected max log segment sequence number " + FutureUtils.completeExceptionally(promise, new DLIllegalStateException("Unexpected max log segment sequence number " + maxLogSegmentSequenceNo.getSequenceNumber() + " for " + getFullyQualifiedName() + ", expected " + (logSegmentSeqNo - 1))); return; @@ -919,7 +918,7 @@ class BKLogWriteHandler extends BKLogHandler { try { startSequenceId = computeStartSequenceId(inprogressLogSegment); } catch (IOException ioe) { - FutureUtils.setException(promise, ioe); + FutureUtils.completeExceptionally(promise, ioe); return; } // write completed ledger znode @@ -946,50 +945,45 @@ class BKLogWriteHandler extends BKLogHandler { LOG.debug("Trying storing LastTxId in Finalize Path {} LastTxId {}", pathForCompletedLedger, lastTxId); storeMaxTxId(txn, maxTxId, lastTxId); - txn.execute().addEventListener(FutureEventListenerRunnable.of(new FutureEventListener<Void>() { + txn.execute().whenCompleteAsync(new FutureEventListener<Void>() { @Override public void onSuccess(Void value) { LOG.info("Completed {} to {} for {} : {}", new Object[] { inprogressZnodeName, completedLogSegment.getSegmentName(), getFullyQualifiedName(), completedLogSegment }); - FutureUtils.setValue(promise, completedLogSegment); + FutureUtils.complete(promise, completedLogSegment); } @Override public void onFailure(Throwable cause) { - FutureUtils.setException(promise, cause); + FutureUtils.completeExceptionally(promise, cause); } - }, scheduler)); + }, scheduler); } - public Future<Long> recoverIncompleteLogSegments() { + public CompletableFuture<Long> recoverIncompleteLogSegments() { try { FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_RecoverIncompleteLogSegments); } catch (IOException ioe) { - return Future.exception(ioe); + return FutureUtils.exception(ioe); } - return getCachedLogSegmentsAfterFirstFetch(LogSegmentMetadata.COMPARATOR).flatMap(recoverLogSegmentsFunction); + return getCachedLogSegmentsAfterFirstFetch(LogSegmentMetadata.COMPARATOR).thenCompose(recoverLogSegmentsFunction); } - class RecoverLogSegmentFunction extends Function<LogSegmentMetadata, Future<LogSegmentMetadata>> { + class RecoverLogSegmentFunction implements Function<LogSegmentMetadata, CompletableFuture<LogSegmentMetadata>> { @Override - public Future<LogSegmentMetadata> apply(final LogSegmentMetadata l) { + public CompletableFuture<LogSegmentMetadata> apply(final LogSegmentMetadata l) { if (!l.isInProgress()) { - return Future.value(l); + return FutureUtils.value(l); } LOG.info("Recovering last record in log segment {} for {}.", l, getFullyQualifiedName()); - return asyncReadLastRecord(l, true, true, true).flatMap( - new AbstractFunction1<LogRecordWithDLSN, Future<LogSegmentMetadata>>() { - @Override - public Future<LogSegmentMetadata> apply(LogRecordWithDLSN lastRecord) { - return completeLogSegment(l, lastRecord); - } - }); + return asyncReadLastRecord(l, true, true, true).thenCompose( + lastRecord -> completeLogSegment(l, lastRecord)); } - private Future<LogSegmentMetadata> completeLogSegment(LogSegmentMetadata l, + private CompletableFuture<LogSegmentMetadata> completeLogSegment(LogSegmentMetadata l, LogRecordWithDLSN lastRecord) { LOG.info("Recovered last record in log segment {} for {}.", l, getFullyQualifiedName()); @@ -1009,14 +1003,14 @@ class BKLogWriteHandler extends BKLogHandler { LOG.error("Unrecoverable corruption has occurred in segment " + l.toString() + " at path " + l.getZkPath() + ". Unable to continue recovery."); - return Future.exception(new IOException("Unrecoverable corruption," + return FutureUtils.exception(new IOException("Unrecoverable corruption," + " please check logs.")); } else if (endTxId == DistributedLogConstants.EMPTY_LOGSEGMENT_TX_ID) { // TODO: Empty ledger - Ideally we should just remove it? endTxId = l.getFirstTxId(); } - Promise<LogSegmentMetadata> promise = new Promise<LogSegmentMetadata>(); + CompletableFuture<LogSegmentMetadata> promise = new CompletableFuture<LogSegmentMetadata>(); doCompleteAndCloseLogSegment( l.getZNodeName(), l.getLogSegmentSequenceNumber(), @@ -1032,21 +1026,16 @@ class BKLogWriteHandler extends BKLogHandler { } - Future<List<LogSegmentMetadata>> setLogSegmentsOlderThanDLSNTruncated(final DLSN dlsn) { + CompletableFuture<List<LogSegmentMetadata>> setLogSegmentsOlderThanDLSNTruncated(final DLSN dlsn) { if (DLSN.InvalidDLSN == dlsn) { List<LogSegmentMetadata> emptyList = new ArrayList<LogSegmentMetadata>(0); - return Future.value(emptyList); + return FutureUtils.value(emptyList); } - return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).flatMap( - new AbstractFunction1<List<LogSegmentMetadata>, Future<List<LogSegmentMetadata>>>() { - @Override - public Future<List<LogSegmentMetadata>> apply(List<LogSegmentMetadata> logSegments) { - return setLogSegmentsOlderThanDLSNTruncated(logSegments, dlsn); - } - }); + return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).thenCompose( + logSegments -> setLogSegmentsOlderThanDLSNTruncated(logSegments, dlsn)); } - private Future<List<LogSegmentMetadata>> setLogSegmentsOlderThanDLSNTruncated(List<LogSegmentMetadata> logSegments, + private CompletableFuture<List<LogSegmentMetadata>> setLogSegmentsOlderThanDLSNTruncated(List<LogSegmentMetadata> logSegments, final DLSN dlsn) { LOG.debug("Setting truncation status on logs older than {} from {} for {}", new Object[]{dlsn, logSegments, getFullyQualifiedName()}); @@ -1064,7 +1053,7 @@ class BKLogWriteHandler extends BKLogHandler { if (null != partialTruncate) { String logMsg = String.format("Potential metadata inconsistency for stream %s at segment %s", getFullyQualifiedName(), l); LOG.error(logMsg); - return Future.exception(new DLIllegalStateException(logMsg)); + return FutureUtils.exception(new DLIllegalStateException(logMsg)); } LOG.info("{}: Partially truncating log segment {} older than {}.", new Object[] {getFullyQualifiedName(), l, dlsn}); partialTruncate = l; @@ -1096,15 +1085,15 @@ class BKLogWriteHandler extends BKLogHandler { } } - Future<List<LogSegmentMetadata>> purgeLogSegmentsOlderThanTimestamp(final long minTimestampToKeep) { + CompletableFuture<List<LogSegmentMetadata>> purgeLogSegmentsOlderThanTimestamp(final long minTimestampToKeep) { if (minTimestampToKeep >= Utils.nowInMillis()) { - return Future.exception(new IllegalArgumentException( + return FutureUtils.exception(new IllegalArgumentException( "Invalid timestamp " + minTimestampToKeep + " to purge logs for " + getFullyQualifiedName())); } - return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).flatMap( - new Function<List<LogSegmentMetadata>, Future<List<LogSegmentMetadata>>>() { + return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).thenCompose( + new Function<List<LogSegmentMetadata>, CompletableFuture<List<LogSegmentMetadata>>>() { @Override - public Future<List<LogSegmentMetadata>> apply(List<LogSegmentMetadata> logSegments) { + public CompletableFuture<List<LogSegmentMetadata>> apply(List<LogSegmentMetadata> logSegments) { List<LogSegmentMetadata> purgeList = new ArrayList<LogSegmentMetadata>(logSegments.size()); int numCandidates = getNumCandidateLogSegmentsToPurge(logSegments); @@ -1129,38 +1118,35 @@ class BKLogWriteHandler extends BKLogHandler { }); } - Future<List<LogSegmentMetadata>> purgeLogSegmentsOlderThanTxnId(final long minTxIdToKeep) { - return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).flatMap( - new AbstractFunction1<List<LogSegmentMetadata>, Future<List<LogSegmentMetadata>>>() { - @Override - public Future<List<LogSegmentMetadata>> apply(List<LogSegmentMetadata> logSegments) { - int numLogSegmentsToProcess; + CompletableFuture<List<LogSegmentMetadata>> purgeLogSegmentsOlderThanTxnId(final long minTxIdToKeep) { + return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).thenCompose( + logSegments -> { + int numLogSegmentsToProcess; - if (minTxIdToKeep < 0) { - // we are deleting the log, we can remove whole log segments - numLogSegmentsToProcess = logSegments.size(); + if (minTxIdToKeep < 0) { + // we are deleting the log, we can remove whole log segments + numLogSegmentsToProcess = logSegments.size(); + } else { + numLogSegmentsToProcess = getNumCandidateLogSegmentsToPurge(logSegments); + } + List<LogSegmentMetadata> purgeList = Lists.newArrayListWithExpectedSize(numLogSegmentsToProcess); + for (int iterator = 0; iterator < numLogSegmentsToProcess; iterator++) { + LogSegmentMetadata l = logSegments.get(iterator); + if ((minTxIdToKeep < 0) || + ((l.isTruncated() || !conf.getExplicitTruncationByApplication()) && + !l.isInProgress() && (l.getLastTxId() < minTxIdToKeep))) { + purgeList.add(l); } else { - numLogSegmentsToProcess = getNumCandidateLogSegmentsToPurge(logSegments); - } - List<LogSegmentMetadata> purgeList = Lists.newArrayListWithExpectedSize(numLogSegmentsToProcess); - for (int iterator = 0; iterator < numLogSegmentsToProcess; iterator++) { - LogSegmentMetadata l = logSegments.get(iterator); - if ((minTxIdToKeep < 0) || - ((l.isTruncated() || !conf.getExplicitTruncationByApplication()) && - !l.isInProgress() && (l.getLastTxId() < minTxIdToKeep))) { - purgeList.add(l); - } else { - // stop truncating log segments if we find either an inprogress or a partially - // truncated log segment - break; - } + // stop truncating log segments if we find either an inprogress or a partially + // truncated log segment + break; } - return deleteLogSegments(purgeList); } + return deleteLogSegments(purgeList); }); } - private Future<List<LogSegmentMetadata>> setLogSegmentTruncationStatus( + private CompletableFuture<List<LogSegmentMetadata>> setLogSegmentTruncationStatus( final List<LogSegmentMetadata> truncateList, LogSegmentMetadata partialTruncate, DLSN minActiveDLSN) { @@ -1183,39 +1169,31 @@ class BKLogWriteHandler extends BKLogHandler { listAfterTruncated.add(newSegment); } - return updateTxn.execute().map(new AbstractFunction1<Void, List<LogSegmentMetadata>>() { - @Override - public List<LogSegmentMetadata> apply(Void value) { - for (int i = 0; i < listToTruncate.size(); i++) { - removeLogSegmentFromCache(listToTruncate.get(i).getSegmentName()); - LogSegmentMetadata newSegment = listAfterTruncated.get(i); - addLogSegmentToCache(newSegment.getSegmentName(), newSegment); - } - return listAfterTruncated; + return updateTxn.execute().thenApply(value -> { + for (int i = 0; i < listToTruncate.size(); i++) { + removeLogSegmentFromCache(listToTruncate.get(i).getSegmentName()); + LogSegmentMetadata newSegment = listAfterTruncated.get(i); + addLogSegmentToCache(newSegment.getSegmentName(), newSegment); } + return listAfterTruncated; }); } - private Future<List<LogSegmentMetadata>> deleteLogSegments( + private CompletableFuture<List<LogSegmentMetadata>> deleteLogSegments( final List<LogSegmentMetadata> logs) { if (LOG.isTraceEnabled()) { LOG.trace("Purging logs for {} : {}", getFullyQualifiedName(), logs); } return FutureUtils.processList(logs, - new Function<LogSegmentMetadata, Future<LogSegmentMetadata>>() { - @Override - public Future<LogSegmentMetadata> apply(LogSegmentMetadata segment) { - return deleteLogSegment(segment); - } - }, scheduler); + segment -> deleteLogSegment(segment), scheduler); } - private Future<LogSegmentMetadata> deleteLogSegment( + private CompletableFuture<LogSegmentMetadata> deleteLogSegment( final LogSegmentMetadata ledgerMetadata) { LOG.info("Deleting ledger {} for {}", ledgerMetadata, getFullyQualifiedName()); - final Promise<LogSegmentMetadata> promise = new Promise<LogSegmentMetadata>(); + final CompletableFuture<LogSegmentMetadata> promise = new CompletableFuture<LogSegmentMetadata>(); final Stopwatch stopwatch = Stopwatch.createStarted(); - promise.addEventListener(new FutureEventListener<LogSegmentMetadata>() { + promise.whenComplete(new FutureEventListener<LogSegmentMetadata>() { @Override public void onSuccess(LogSegmentMetadata segment) { deleteOpStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); @@ -1227,10 +1205,10 @@ class BKLogWriteHandler extends BKLogHandler { } }); entryStore.deleteLogSegment(ledgerMetadata) - .addEventListener(new FutureEventListener<LogSegmentMetadata>() { + .whenComplete(new FutureEventListener<LogSegmentMetadata>() { @Override public void onFailure(Throwable cause) { - FutureUtils.setException(promise, cause); + FutureUtils.completeExceptionally(promise, cause); } @Override @@ -1242,14 +1220,14 @@ class BKLogWriteHandler extends BKLogHandler { } private void deleteLogSegmentMetadata(final LogSegmentMetadata segmentMetadata, - final Promise<LogSegmentMetadata> promise) { + final CompletableFuture<LogSegmentMetadata> promise) { Transaction<Object> deleteTxn = metadataStore.transaction(); metadataStore.deleteLogSegment(deleteTxn, segmentMetadata, new Transaction.OpListener<Void>() { @Override public void onCommit(Void r) { // purge log segment removeLogSegmentFromCache(segmentMetadata.getZNodeName()); - promise.setValue(segmentMetadata); + promise.complete(segmentMetadata); } @Override @@ -1257,12 +1235,12 @@ class BKLogWriteHandler extends BKLogHandler { if (t instanceof LogSegmentNotFoundException) { // purge log segment removeLogSegmentFromCache(segmentMetadata.getZNodeName()); - promise.setValue(segmentMetadata); + promise.complete(segmentMetadata); return; } else { LOG.error("Couldn't purge {} for {}: with error {}", new Object[]{ segmentMetadata, getFullyQualifiedName(), t }); - promise.setException(t); + promise.completeExceptionally(t); } } }); @@ -1270,14 +1248,14 @@ class BKLogWriteHandler extends BKLogHandler { } @Override - public Future<Void> asyncClose() { + public CompletableFuture<Void> asyncClose() { return Utils.closeSequence(scheduler, lock, logSegmentAllocator); } @Override - public Future<Void> asyncAbort() { + public CompletableFuture<Void> asyncAbort() { return asyncClose(); }
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogReader.java index bf89823..47301b5 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogReader.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogReader.java @@ -20,17 +20,15 @@ package org.apache.distributedlog; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Ticker; +import java.util.concurrent.CompletableFuture; +import org.apache.distributedlog.api.AsyncLogReader; +import org.apache.distributedlog.api.LogReader; import org.apache.distributedlog.exceptions.EndOfStreamException; import org.apache.distributedlog.exceptions.IdleReaderException; -import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.common.concurrent.FutureUtils; import org.apache.distributedlog.util.Utils; -import com.twitter.util.Future; -import com.twitter.util.Promise; import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.bookkeeper.versioning.Versioned; -import scala.runtime.AbstractFunction1; -import scala.runtime.BoxedUnit; import java.io.IOException; import java.util.LinkedList; @@ -48,7 +46,7 @@ class BKSyncLogReader implements LogReader, AsyncNotification { private final AtomicReference<IOException> readerException = new AtomicReference<IOException>(null); private final int maxReadAheadWaitTime; - private Promise<Void> closeFuture; + private CompletableFuture<Void> closeFuture; private final Optional<Long> startTransactionId; private boolean positioned = false; private Entry.Reader currentEntry = null; @@ -101,13 +99,10 @@ class BKSyncLogReader implements LogReader, AsyncNotification { bkdlm.alertStatsLogger); readHandler.registerListener(readAheadReader); readHandler.asyncStartFetchLogSegments() - .map(new AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() { - @Override - public BoxedUnit apply(Versioned<List<LogSegmentMetadata>> logSegments) { - readAheadReader.addStateChangeNotification(BKSyncLogReader.this); - readAheadReader.start(logSegments.getValue()); - return BoxedUnit.UNIT; - } + .thenApply(logSegments -> { + readAheadReader.addStateChangeNotification(BKSyncLogReader.this); + readAheadReader.start(logSegments.getValue()); + return null; }); } @@ -234,26 +229,28 @@ class BKSyncLogReader implements LogReader, AsyncNotification { } @Override - public Future<Void> asyncClose() { - Promise<Void> closePromise; + public CompletableFuture<Void> asyncClose() { + CompletableFuture<Void> closePromise; synchronized (this) { if (null != closeFuture) { return closeFuture; } - closeFuture = closePromise = new Promise<Void>(); + closeFuture = closePromise = new CompletableFuture<Void>(); } readHandler.unregisterListener(readAheadReader); readAheadReader.removeStateChangeNotification(this); - Utils.closeSequence(bkdlm.getScheduler(), true, - readAheadReader, - readHandler - ).proxyTo(closePromise); + FutureUtils.proxyTo( + Utils.closeSequence(bkdlm.getScheduler(), true, + readAheadReader, + readHandler + ), + closePromise); return closePromise; } @Override public void close() throws IOException { - FutureUtils.result(asyncClose()); + Utils.ioResult(asyncClose()); } // http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogWriter.java index 7d33d12..15296b2 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogWriter.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogWriter.java @@ -17,11 +17,11 @@ */ package org.apache.distributedlog; -import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; -import org.apache.distributedlog.util.FutureUtils; - import java.io.IOException; import java.util.List; +import org.apache.distributedlog.api.LogWriter; +import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; +import org.apache.distributedlog.util.Utils; class BKSyncLogWriter extends BKAbstractLogWriter implements LogWriter { @@ -59,7 +59,7 @@ class BKSyncLogWriter extends BKAbstractLogWriter implements LogWriter { */ @Override public void markEndOfStream() throws IOException { - FutureUtils.result(getLedgerWriter(DistributedLogConstants.MAX_TXID, true).markEndOfStream()); + Utils.ioResult(getLedgerWriter(DistributedLogConstants.MAX_TXID, true).markEndOfStream()); closeAndComplete(); } @@ -73,7 +73,7 @@ class BKSyncLogWriter extends BKAbstractLogWriter implements LogWriter { long highestTransactionId = 0; BKLogSegmentWriter writer = getCachedLogWriter(); if (null != writer) { - highestTransactionId = Math.max(highestTransactionId, FutureUtils.result(writer.flush())); + highestTransactionId = Math.max(highestTransactionId, Utils.ioResult(writer.flush())); } return highestTransactionId; } @@ -93,7 +93,7 @@ class BKSyncLogWriter extends BKAbstractLogWriter implements LogWriter { long highestTransactionId = 0; BKLogSegmentWriter writer = getCachedLogWriter(); if (null != writer) { - highestTransactionId = Math.max(highestTransactionId, FutureUtils.result(writer.commit())); + highestTransactionId = Math.max(highestTransactionId, Utils.ioResult(writer.commit())); LOG.debug("FlushAndSync Completed"); } else { LOG.debug("FlushAndSync Completed - Nothing to Flush"); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/BKTransmitPacket.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKTransmitPacket.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKTransmitPacket.java index 6ed662b..3715327 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKTransmitPacket.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKTransmitPacket.java @@ -17,30 +17,28 @@ */ package org.apache.distributedlog; -import com.twitter.util.Await; -import com.twitter.util.Duration; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; - +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import org.apache.distributedlog.common.concurrent.FutureEventListener; +import org.apache.distributedlog.common.concurrent.FutureUtils; class BKTransmitPacket { private final EntryBuffer recordSet; private final long transmitTime; - private final Promise<Integer> transmitComplete; + private final CompletableFuture<Integer> transmitComplete; BKTransmitPacket(EntryBuffer recordSet) { this.recordSet = recordSet; this.transmitTime = System.nanoTime(); - this.transmitComplete = new Promise<Integer>(); + this.transmitComplete = new CompletableFuture<Integer>(); } EntryBuffer getRecordSet() { return recordSet; } - Promise<Integer> getTransmitFuture() { + CompletableFuture<Integer> getTransmitFuture() { return transmitComplete; } @@ -53,7 +51,7 @@ class BKTransmitPacket { * transmit result code. */ public void notifyTransmitComplete(int transmitResult) { - transmitComplete.setValue(transmitResult); + transmitComplete.complete(transmitResult); } /** @@ -66,7 +64,7 @@ class BKTransmitPacket { * @see #awaitTransmitComplete(long, TimeUnit) */ void addTransmitCompleteListener(FutureEventListener<Integer> transmitCompleteListener) { - transmitComplete.addEventListener(transmitCompleteListener); + transmitComplete.whenComplete(transmitCompleteListener); } /** @@ -79,8 +77,7 @@ class BKTransmitPacket { */ int awaitTransmitComplete(long timeout, TimeUnit unit) throws Exception { - return Await.result(transmitComplete, - Duration.fromTimeUnit(timeout, unit)); + return FutureUtils.result(transmitComplete, timeout, unit); } public long getTransmitTime() { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClient.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClient.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClient.java index a7b17f4..2ea3b5d 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClient.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClient.java @@ -18,17 +18,7 @@ package org.apache.distributedlog; import com.google.common.base.Optional; -import org.apache.distributedlog.ZooKeeperClient.Credentials; -import org.apache.distributedlog.ZooKeeperClient.DigestCredentials; -import org.apache.distributedlog.exceptions.AlreadyClosedException; -import org.apache.distributedlog.exceptions.DLInterruptedException; -import org.apache.distributedlog.exceptions.ZKException; -import org.apache.distributedlog.net.NetUtils; -import org.apache.distributedlog.util.ConfUtils; -import com.twitter.util.Future; -import com.twitter.util.Promise; -import com.twitter.util.Return; -import com.twitter.util.Throw; +import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.client.AsyncCallback; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; @@ -41,6 +31,14 @@ import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; import org.apache.bookkeeper.zookeeper.RetryPolicy; import org.apache.commons.configuration.ConfigurationException; +import org.apache.distributedlog.ZooKeeperClient.Credentials; +import org.apache.distributedlog.ZooKeeperClient.DigestCredentials; +import org.apache.distributedlog.exceptions.AlreadyClosedException; +import org.apache.distributedlog.exceptions.DLInterruptedException; +import org.apache.distributedlog.exceptions.ZKException; +import org.apache.distributedlog.net.NetUtils; +import org.apache.distributedlog.util.ConfUtils; +import org.apache.distributedlog.common.concurrent.FutureUtils; import org.apache.zookeeper.KeeperException; import org.jboss.netty.channel.socket.ClientSocketChannelFactory; import org.jboss.netty.util.HashedWheelTimer; @@ -198,52 +196,52 @@ public class BookKeeperClient { } // Util functions - public Future<LedgerHandle> createLedger(int ensembleSize, - int writeQuorumSize, - int ackQuorumSize) { + public CompletableFuture<LedgerHandle> createLedger(int ensembleSize, + int writeQuorumSize, + int ackQuorumSize) { BookKeeper bk; try { bk = get(); } catch (IOException ioe) { - return Future.exception(ioe); + return FutureUtils.exception(ioe); } - final Promise<LedgerHandle> promise = new Promise<LedgerHandle>(); + final CompletableFuture<LedgerHandle> promise = new CompletableFuture<LedgerHandle>(); bk.asyncCreateLedger(ensembleSize, writeQuorumSize, ackQuorumSize, BookKeeper.DigestType.CRC32, passwd, new AsyncCallback.CreateCallback() { @Override public void createComplete(int rc, LedgerHandle lh, Object ctx) { if (BKException.Code.OK == rc) { - promise.updateIfEmpty(new Return<LedgerHandle>(lh)); + promise.complete(lh); } else { - promise.updateIfEmpty(new Throw<LedgerHandle>(BKException.create(rc))); + promise.completeExceptionally(BKException.create(rc)); } } }, null); return promise; } - public Future<Void> deleteLedger(long lid, + public CompletableFuture<Void> deleteLedger(long lid, final boolean ignoreNonExistentLedger) { BookKeeper bk; try { bk = get(); } catch (IOException ioe) { - return Future.exception(ioe); + return FutureUtils.exception(ioe); } - final Promise<Void> promise = new Promise<Void>(); + final CompletableFuture<Void> promise = new CompletableFuture<Void>(); bk.asyncDeleteLedger(lid, new AsyncCallback.DeleteCallback() { @Override public void deleteComplete(int rc, Object ctx) { if (BKException.Code.OK == rc) { - promise.updateIfEmpty(new Return<Void>(null)); + promise.complete(null); } else if (BKException.Code.NoSuchLedgerExistsException == rc) { if (ignoreNonExistentLedger) { - promise.updateIfEmpty(new Return<Void>(null)); + promise.complete(null); } else { - promise.updateIfEmpty(new Throw<Void>(BKException.create(rc))); + promise.completeExceptionally(BKException.create(rc)); } } else { - promise.updateIfEmpty(new Throw<Void>(BKException.create(rc))); + promise.completeExceptionally(BKException.create(rc)); } } }, null); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java b/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java index 0cb608f..3269f57 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java @@ -22,7 +22,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Sets; import org.apache.distributedlog.bk.QuorumConfig; import org.apache.distributedlog.feature.DefaultFeatureProvider; -import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder; +import org.apache.distributedlog.api.namespace.NamespaceBuilder; import org.apache.distributedlog.net.DNSResolverForRacks; import org.apache.distributedlog.net.DNSResolverForRows; import org.apache.bookkeeper.conf.ClientConfiguration; @@ -1351,7 +1351,6 @@ public class DistributedLogConfiguration extends CompositeConfiguration { * <p> * The setting is only applied for v2 implementation. * - * @see org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor * @return number of resource release threads used by distributedlog namespace. */ public int getNumResourceReleaseThreads() { @@ -3048,7 +3047,7 @@ public class DistributedLogConfiguration extends CompositeConfiguration { /** * Whether to enable per stream stat or not. * - * @deprecated please use {@link DistributedLogNamespaceBuilder#perLogStatsLogger(StatsLogger)} + * @deprecated please use {@link NamespaceBuilder#perLogStatsLogger(StatsLogger)} * @return flag to enable per stream stat. */ public boolean getEnablePerStreamStat() { @@ -3058,7 +3057,7 @@ public class DistributedLogConfiguration extends CompositeConfiguration { /** * Set the flag to enable per stream stat or not. * - * @deprecated please use {@link DistributedLogNamespaceBuilder#perLogStatsLogger(StatsLogger)} + * @deprecated please use {@link NamespaceBuilder#perLogStatsLogger(StatsLogger)} * @param enabled * flag to enable/disable per stream stat. * @return dl configuration. http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogManager.java b/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogManager.java deleted file mode 100644 index 7d33e9c..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogManager.java +++ /dev/null @@ -1,308 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog; - -import org.apache.distributedlog.callback.LogSegmentListener; -import org.apache.distributedlog.io.AsyncCloseable; -import org.apache.distributedlog.namespace.NamespaceDriver; -import org.apache.distributedlog.subscription.SubscriptionStateStore; -import org.apache.distributedlog.subscription.SubscriptionsStore; -import com.twitter.util.Future; - -import java.io.Closeable; -import java.io.IOException; -import java.util.List; - -/** - * A DistributedLogManager is responsible for managing a single place of storing - * edit logs. It may correspond to multiple files, a backup node, etc. - * Even when the actual underlying storage is rolled, or failed and restored, - * each conceptual place of storage corresponds to exactly one instance of - * this class, which is created when the EditLog is first opened. - */ -public interface DistributedLogManager extends AsyncCloseable, Closeable { - - /** - * Get the name of the stream managed by this log manager - * @return streamName - */ - public String getStreamName(); - - /** - * Get the namespace driver used by this manager. - * - * @return the namespace driver - */ - public NamespaceDriver getNamespaceDriver(); - - /** - * Get log segments. - * - * @return log segments - * @throws IOException - */ - public List<LogSegmentMetadata> getLogSegments() throws IOException; - - /** - * Register <i>listener</i> on log segment updates of this stream. - * - * @param listener - * listener to receive update log segment list. - */ - public void registerListener(LogSegmentListener listener) throws IOException ; - - /** - * Unregister <i>listener</i> on log segment updates from this stream. - * - * @param listener - * listener to receive update log segment list. - */ - public void unregisterListener(LogSegmentListener listener); - - /** - * Open async log writer to write records to the log stream. - * - * @return result represents the open result - */ - public Future<AsyncLogWriter> openAsyncLogWriter(); - - /** - * Begin writing to the log stream identified by the name - * - * @return the writer interface to generate log records - */ - public LogWriter startLogSegmentNonPartitioned() throws IOException; - - /** - * Begin writing to the log stream identified by the name - * - * @return the writer interface to generate log records - */ - // @Deprecated - public AsyncLogWriter startAsyncLogSegmentNonPartitioned() throws IOException; - - /** - * Begin appending to the end of the log stream which is being treated as a sequence of bytes - * - * @return the writer interface to generate log records - */ - public AppendOnlyStreamWriter getAppendOnlyStreamWriter() throws IOException; - - /** - * Get a reader to read a log stream as a sequence of bytes - * - * @return the writer interface to generate log records - */ - public AppendOnlyStreamReader getAppendOnlyStreamReader() throws IOException; - - /** - * Get the input stream starting with fromTxnId for the specified log - * - * @param fromTxnId - the first transaction id we want to read - * @return the stream starting with transaction fromTxnId - * @throws IOException if a stream cannot be found. - */ - public LogReader getInputStream(long fromTxnId) - throws IOException; - - public LogReader getInputStream(DLSN fromDLSN) throws IOException; - - /** - * Open an async log reader to read records from a log starting from <code>fromTxnId</code>. - * - * @param fromTxnId - * transaction id to start reading from - * @return async log reader - */ - public Future<AsyncLogReader> openAsyncLogReader(long fromTxnId); - - /** - * Open an async log reader to read records from a log starting from <code>fromDLSN</code> - * - * @param fromDLSN - * dlsn to start reading from - * @return async log reader - */ - public Future<AsyncLogReader> openAsyncLogReader(DLSN fromDLSN); - - // @Deprecated - public AsyncLogReader getAsyncLogReader(long fromTxnId) throws IOException; - - // @Deprecated - public AsyncLogReader getAsyncLogReader(DLSN fromDLSN) throws IOException; - - public Future<AsyncLogReader> getAsyncLogReaderWithLock(DLSN fromDLSN); - - /** - * Get a log reader with lock starting from <i>fromDLSN</i> and using <i>subscriberId</i>. - * If two readers tried to open using same subscriberId, one would succeed, while the other - * will be blocked until it gets the lock. - * - * @param fromDLSN - * start dlsn - * @param subscriberId - * subscriber id - * @return async log reader - */ - public Future<AsyncLogReader> getAsyncLogReaderWithLock(DLSN fromDLSN, String subscriberId); - - /** - * Get a log reader using <i>subscriberId</i> with lock. The reader will start reading from - * its last commit position recorded in subscription store. If no last commit position found - * in subscription store, it would start reading from head of the stream. - * - * If the two readers tried to open using same subscriberId, one would succeed, while the other - * will be blocked until it gets the lock. - * - * @param subscriberId - * subscriber id - * @return async log reader - */ - public Future<AsyncLogReader> getAsyncLogReaderWithLock(String subscriberId); - - /** - * Get the {@link DLSN} of first log record whose transaction id is not less than <code>transactionId</code>. - * - * @param transactionId - * transaction id - * @return dlsn of first log record whose transaction id is not less than transactionId. - */ - public Future<DLSN> getDLSNNotLessThanTxId(long transactionId); - - /** - * Get the last log record in the stream - * - * @return the last log record in the stream - * @throws IOException if a stream cannot be found. - */ - public LogRecordWithDLSN getLastLogRecord() - throws IOException; - - /** - * Get the earliest Transaction Id available in the log - * - * @return earliest transaction id - * @throws IOException - */ - public long getFirstTxId() throws IOException; - - /** - * Get Latest Transaction Id in the log - * - * @return latest transaction id - * @throws IOException - */ - public long getLastTxId() throws IOException; - - /** - * Get Latest DLSN in the log - * - * @return last dlsn - * @throws IOException - */ - public DLSN getLastDLSN() throws IOException; - - /** - * Get Latest log record with DLSN in the log - async - * - * @return latest log record with DLSN - */ - public Future<LogRecordWithDLSN> getLastLogRecordAsync(); - - /** - * Get Latest Transaction Id in the log - async - * - * @return latest transaction id - */ - public Future<Long> getLastTxIdAsync(); - - /** - * Get first DLSN in the log. - * - * @return first dlsn in the stream - */ - public Future<DLSN> getFirstDLSNAsync(); - - /** - * Get Latest DLSN in the log - async - * - * @return latest transaction id - */ - public Future<DLSN> getLastDLSNAsync(); - - /** - * Get the number of log records in the active portion of the log - * Any log segments that have already been truncated will not be included - * - * @return number of log records - * @throws IOException - */ - public long getLogRecordCount() throws IOException; - - /** - * Get the number of log records in the active portion of the log - async. - * Any log segments that have already been truncated will not be included - * - * @return future number of log records - * @throws IOException - */ - public Future<Long> getLogRecordCountAsync(final DLSN beginDLSN); - - /** - * Run recovery on the log. - * - * @throws IOException - */ - public void recover() throws IOException; - - /** - * Check if an end of stream marker was added to the stream - * A stream with an end of stream marker cannot be appended to - * - * @return true if the marker was added to the stream, false otherwise - * @throws IOException - */ - public boolean isEndOfStreamMarked() throws IOException; - - /** - * Delete the log. - * - * @throws IOException if the deletion fails - */ - public void delete() throws IOException; - - /** - * The DistributedLogManager may archive/purge any logs for transactionId - * less than or equal to minImageTxId. - * This is to be used only when the client explicitly manages deletion. If - * the cleanup policy is based on sliding time window, then this method need - * not be called. - * - * @param minTxIdToKeep the earliest txid that must be retained - * @throws IOException if purging fails - */ - public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException; - - /** - * Get the subscriptions store provided by the distributedlog manager. - * - * @return subscriptions store manages subscriptions for current stream. - */ - public SubscriptionsStore getSubscriptionsStore(); - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/Entry.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/Entry.java b/distributedlog-core/src/main/java/org/apache/distributedlog/Entry.java index 617282c..30cd499 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/Entry.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/Entry.java @@ -19,10 +19,10 @@ package org.apache.distributedlog; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import java.util.concurrent.CompletableFuture; import org.apache.distributedlog.exceptions.LogRecordTooLongException; import org.apache.distributedlog.exceptions.WriteException; import org.apache.distributedlog.io.CompressionCodec; -import com.twitter.util.Promise; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; @@ -342,7 +342,7 @@ public class Entry { * @throws LogRecordTooLongException if the record is too long * @throws WriteException when encountered exception writing the record */ - void writeRecord(LogRecord record, Promise<DLSN> transmitPromise) + void writeRecord(LogRecord record, CompletableFuture<DLSN> transmitPromise) throws LogRecordTooLongException, WriteException; /** http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java index aed47fc..09301aa 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java @@ -30,10 +30,10 @@ import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.distributedlog.annotations.DistributedLogAnnotations.Compression; +import org.apache.distributedlog.common.annotations.DistributedLogAnnotations.Compression; import org.apache.distributedlog.io.CompressionCodec; import org.apache.distributedlog.io.CompressionUtils; -import org.apache.distributedlog.util.BitMaskUtils; +import org.apache.distributedlog.common.util.BitMaskUtils; /** * An enveloped entry written to BookKeeper. http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java index 54858d7..18645d4 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java @@ -17,6 +17,7 @@ */ package org.apache.distributedlog; +import java.util.concurrent.CompletableFuture; import org.apache.distributedlog.Entry.Writer; import org.apache.distributedlog.exceptions.InvalidEnvelopedEntryException; import org.apache.distributedlog.exceptions.LogRecordTooLongException; @@ -24,7 +25,6 @@ import org.apache.distributedlog.exceptions.WriteCancelledException; import org.apache.distributedlog.exceptions.WriteException; import org.apache.distributedlog.io.Buffer; import org.apache.distributedlog.io.CompressionCodec; -import com.twitter.util.Promise; import org.apache.bookkeeper.stats.StatsLogger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,9 +46,9 @@ class EnvelopedEntryWriter implements Writer { private static class WriteRequest { private final int numRecords; - private final Promise<DLSN> promise; + private final CompletableFuture<DLSN> promise; - WriteRequest(int numRecords, Promise<DLSN> promise) { + WriteRequest(int numRecords, CompletableFuture<DLSN> promise) { this.numRecords = numRecords; this.promise = promise; } @@ -89,7 +89,7 @@ class EnvelopedEntryWriter implements Writer { @Override public synchronized void writeRecord(LogRecord record, - Promise<DLSN> transmitPromise) + CompletableFuture<DLSN> transmitPromise) throws LogRecordTooLongException, WriteException { int logRecordSize = record.getPersistentSize(); if (logRecordSize > MAX_LOGRECORD_SIZE) { @@ -121,7 +121,7 @@ class EnvelopedEntryWriter implements Writer { private synchronized void satisfyPromises(long lssn, long entryId) { long nextSlotId = 0; for (WriteRequest request : writeRequests) { - request.promise.setValue(new DLSN(lssn, entryId, nextSlotId)); + request.promise.complete(new DLSN(lssn, entryId, nextSlotId)); nextSlotId += request.numRecords; } writeRequests.clear(); @@ -129,7 +129,7 @@ class EnvelopedEntryWriter implements Writer { private synchronized void cancelPromises(Throwable reason) { for (WriteRequest request : writeRequests) { - request.promise.setException(reason); + request.promise.completeExceptionally(reason); } writeRequests.clear(); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/LogReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/LogReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/LogReader.java deleted file mode 100644 index baf3182..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/LogReader.java +++ /dev/null @@ -1,195 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog; - -import org.apache.distributedlog.io.AsyncCloseable; - -import java.io.Closeable; -import java.io.IOException; -import java.util.List; - -/** - * <i>LogReader</i> is a `synchronous` reader reading records from a DL log. - * - * <h3>Lifecycle of a Reader</h3> - * - * A reader is a <i>sequential</i> reader that read records from a DL log starting - * from a given position. The position could be a <i>DLSN</i> (via {@link DistributedLogManager#getInputStream(DLSN)} - * or a <i>Transaction ID</i> (via {@link DistributedLogManager#getInputStream(long)}. - * <p> - * After the reader is open, it could call {@link #readNext(boolean)} or {@link #readBulk(boolean, int)} - * to read records out the log from provided position. - * <p> - * Closing the reader (via {@link #close()} will release all the resources occupied - * by this reader instance. - * <p> - * Exceptions could be thrown during reading records. Once the exception is thrown, - * the reader is set to an error state and it isn't usable anymore. It is the application's - * responsibility to handle the exceptions and re-create readers if necessary. - * <p> - * Example: - * <pre> - * DistributedLogManager dlm = ...; - * long nextTxId = ...; - * LogReader reader = dlm.getInputStream(nextTxId); - * - * while (true) { // keep reading & processing records - * LogRecord record; - * try { - * record = reader.readNext(false); - * nextTxId = record.getTransactionId(); - * // process the record - * ... - * } catch (IOException ioe) { - * // handle the exception - * ... - * reader = dlm.getInputStream(nextTxId + 1); - * } - * } - * - * </pre> - * - * <h3>Read Records</h3> - * - * Reading records from an <i>endless</i> log in `synchronous` way isn't as - * trivial as in `asynchronous` way (via {@link AsyncLogReader}. Because it - * lacks of callback mechanism. LogReader introduces a flag `nonBlocking` on - * controlling the <i>waiting</i> behavior on `synchronous` reads. - * - * <h4>Blocking vs NonBlocking</h4> - * - * <i>Blocking</i> (nonBlocking = false) means the reads will wait for records - * before returning read calls. While <i>NonBlocking</i> (nonBlocking = true) - * means the reads will only check readahead cache and return whatever records - * available in the readahead cache. - * <p> - * The <i>waiting</i> period varies in <i>blocking</i> mode. If the reader is - * catching up with writer (there are records in the log), the read call will - * wait until records are read and returned. If the reader is caught up with - * writer (there are no more records in the log at read time), the read call - * will wait for a small period of time (defined in - * {@link DistributedLogConfiguration#getReadAheadWaitTime()} and return whatever - * records available in the readahead cache. In other words, if a reader sees - * no record on blocking reads, it means the reader is `caught-up` with the - * writer. - * <p> - * <i>Blocking</i> and <i>NonBlocking</i> modes are useful for building replicated - * state machines. Applications could use <i>blocking</i> reads till caught up - * with latest data. Once they are caught up with latest data, they could start - * serving their service and turn to <i>non-blocking</i> read mode and tail read - * data from the logs. - * <p> - * See examples below. - * - * <h4>Read Single Record</h4> - * - * {@link #readNext(boolean)} is reading individual records from a DL log. - * - * <pre> - * LogReader reader = ... - * - * // keep reading records in blocking way until no records available in the log - * LogRecord record = reader.readNext(false); - * while (null != record) { - * // process the record - * ... - * // read next record - * records = reader.readNext(false); - * } - * - * ... - * - * // reader is caught up with writer, doing non-blocking reads to tail the log - * while (true) { - * record = reader.readNext(true) - * // process the new records - * ... - * } - * </pre> - * - * <h4>Read Batch of Records</h4> - * - * {@link #readBulk(boolean, int)} is a convenient way to read a batch of records - * from a DL log. - * - * <pre> - * LogReader reader = ... - * int N = 10; - * - * // keep reading N records in blocking way until no records available in the log - * List<LogRecord> records = reader.readBulk(false, N); - * while (!records.isEmpty()) { - * // process the list of records - * ... - * if (records.size() < N) { // no more records available in the log - * break; - * } - * // read next N records - * records = reader.readBulk(false, N); - * } - * - * ... - * - * // reader is caught up with writer, doing non-blocking reads to tail the log - * while (true) { - * records = reader.readBulk(true, N) - * // process the new records - * ... - * } - * - * </pre> - * - * <p> - * NOTE: Extending {@link AsyncCloseable}: BKSyncLogReader is implemented based on BKAsyncLogReader, exposing - * the {@link AsyncCloseable} interface so the reader could be closed asynchronously - * - * @see AsyncLogReader - */ -public interface LogReader extends Closeable, AsyncCloseable { - - /** - * Read the next log record from the stream. - * <p> - * If <i>nonBlocking</i> is set to true, the call returns immediately by just polling - * records from read ahead cache. It would return <i>null</i> if there isn't any records - * available in the read ahead cache. - * <p> - * If <i>nonBlocking</i> is set to false, it would does blocking call. The call will - * block until return a record if there are records in the stream (aka catching up). - * Otherwise it would wait up to {@link DistributedLogConfiguration#getReadAheadWaitTime()} - * milliseconds and return null if there isn't any more records in the stream. - * - * @param nonBlocking should the read make blocking calls to the backend or rely on the - * readAhead cache - * @return an operation from the stream or null if at end of stream - * @throws IOException if there is an error reading from the stream - */ - public LogRecordWithDLSN readNext(boolean nonBlocking) throws IOException; - - /** - * Read the next <i>numLogRecords</i> log records from the stream - * - * @param nonBlocking should the read make blocking calls to the backend or rely on the - * readAhead cache - * @param numLogRecords maximum number of log records returned by this call. - * @return an operation from the stream or empty list if at end of stream - * @throws IOException if there is an error reading from the stream - * @see #readNext(boolean) - */ - public List<LogRecordWithDLSN> readBulk(boolean nonBlocking, int numLogRecords) throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/LogSegmentMetadata.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/LogSegmentMetadata.java b/distributedlog-core/src/main/java/org/apache/distributedlog/LogSegmentMetadata.java index c5050ec..462ddaa 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/LogSegmentMetadata.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/LogSegmentMetadata.java @@ -23,14 +23,13 @@ import java.util.Comparator; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; +import java.util.concurrent.CompletableFuture; import org.apache.distributedlog.exceptions.DLInterruptedException; import org.apache.distributedlog.exceptions.LogSegmentNotFoundException; import org.apache.distributedlog.exceptions.UnsupportedMetadataVersionException; import org.apache.distributedlog.exceptions.ZKException; -import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.common.concurrent.FutureUtils; import org.apache.distributedlog.util.Utils; -import com.twitter.util.Future; -import com.twitter.util.Promise; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -590,21 +589,21 @@ public class LogSegmentMetadata { .build(); } - public static Future<LogSegmentMetadata> read(ZooKeeperClient zkc, String path) { + public static CompletableFuture<LogSegmentMetadata> read(ZooKeeperClient zkc, String path) { return read(zkc, path, false); } - public static Future<LogSegmentMetadata> read(ZooKeeperClient zkc, String path, final boolean skipMinVersionCheck) { - final Promise<LogSegmentMetadata> result = new Promise<LogSegmentMetadata>(); + public static CompletableFuture<LogSegmentMetadata> read(ZooKeeperClient zkc, String path, final boolean skipMinVersionCheck) { + final CompletableFuture<LogSegmentMetadata> result = new CompletableFuture<LogSegmentMetadata>(); try { zkc.get().getData(path, false, new AsyncCallback.DataCallback() { @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { if (KeeperException.Code.OK.intValue() != rc) { if (KeeperException.Code.NONODE.intValue() == rc) { - FutureUtils.setException(result, new LogSegmentNotFoundException(path)); + FutureUtils.completeExceptionally(result, new LogSegmentNotFoundException(path)); } else { - FutureUtils.setException(result, + FutureUtils.completeExceptionally(result, new ZKException("Failed to read log segment metadata from " + path, KeeperException.Code.get(rc))); } @@ -612,17 +611,17 @@ public class LogSegmentMetadata { } try { LogSegmentMetadata metadata = parseData(path, data, skipMinVersionCheck); - FutureUtils.setValue(result, metadata); + FutureUtils.complete(result, metadata); } catch (IOException ie) { LOG.error("Error on parsing log segment metadata from {} : ", path, ie); - result.setException(ie); + result.completeExceptionally(ie); } } }, null); } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - result.setException(FutureUtils.zkException(e, path)); + result.completeExceptionally(Utils.zkException(e, path)); } catch (InterruptedException e) { - result.setException(FutureUtils.zkException(e, path)); + result.completeExceptionally(Utils.zkException(e, path)); } return result; }