DL-162: Use log segment entry store interface - Use log segment entry store interface - Delete the old readahead implementation
Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/d871e657 Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/d871e657 Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/d871e657 Branch: refs/heads/master Commit: d871e6570fe49cab56fcec37d3a8a6bd91afbe4d Parents: 47622a6 Author: Sijie Guo <sij...@twitter.com> Authored: Wed Dec 28 17:09:57 2016 -0800 Committer: Sijie Guo <sij...@twitter.com> Committed: Thu Dec 29 02:13:02 2016 -0800 ---------------------------------------------------------------------- .../distributedlog/BKAsyncLogReader.java | 68 +- .../distributedlog/BKDistributedLogManager.java | 59 +- .../BKDistributedLogNamespace.java | 10 - .../twitter/distributedlog/BKLogHandler.java | 33 +- .../distributedlog/BKLogReadHandler.java | 15 +- .../distributedlog/BKLogWriteHandler.java | 46 +- .../distributedlog/LedgerDescriptor.java | 67 - .../distributedlog/LedgerHandleCache.java | 463 ------ .../twitter/distributedlog/ReadAheadCache.java | 233 --- .../com/twitter/distributedlog/ReadUtils.java | 204 +-- .../admin/DistributedLogAdmin.java | 52 +- .../impl/logsegment/BKLogSegmentEntryStore.java | 136 +- .../BKLogSegmentRandomAccessEntryReader.java | 119 ++ .../logsegment/LogSegmentEntryStore.java | 17 + .../LogSegmentRandomAccessEntryReader.java | 47 + .../readahead/ReadAheadPhase.java | 45 - .../readahead/ReadAheadTracker.java | 104 -- .../readahead/ReadAheadWorker.java | 1503 ------------------ .../stats/ReadAheadExceptionsLogger.java | 60 - .../distributedlog/TestLedgerHandleCache.java | 180 --- .../TestReadAheadEntryReader.java | 2 +- .../twitter/distributedlog/TestReadUtils.java | 44 +- .../twitter/distributedlog/admin/TestDLCK.java | 9 +- .../logsegment/TestBKLogSegmentEntryReader.java | 2 +- .../mapreduce/DistributedLogInputFormat.java | 2 +- .../mapreduce/LogSegmentReader.java | 2 +- .../mapreduce/LogSegmentSplit.java | 4 +- 27 files changed, 509 insertions(+), 3017 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java index 18d2e15..b9d0365 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java @@ -409,43 +409,39 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification { final PendingReadRequest readRequest = new PendingReadRequest(numEntries, deadlineTime, deadlineTimeUnit); if (null == readAheadReader) { - try { - final ReadAheadEntryReader readAheadEntryReader = this.readAheadReader = new ReadAheadEntryReader( - getStreamName(), - getStartDLSN(), - bkDistributedLogManager.getConf(), - readHandler, - bkDistributedLogManager.getReaderEntryStore(), - bkDistributedLogManager.getScheduler(), - Ticker.systemTicker(), - bkDistributedLogManager.alertStatsLogger); - readHandler.checkLogStreamExists().addEventListener(new FutureEventListener<Void>() { - @Override - public void onSuccess(Void value) { - try { - readHandler.registerListener(readAheadEntryReader); - readHandler.asyncStartFetchLogSegments() - .map(new AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() { - @Override - public BoxedUnit apply(Versioned<List<LogSegmentMetadata>> logSegments) { - readAheadEntryReader.addStateChangeNotification(BKAsyncLogReader.this); - readAheadEntryReader.start(logSegments.getValue()); - return BoxedUnit.UNIT; - } - }); - } catch (Exception exc) { - notifyOnError(exc); - } + final ReadAheadEntryReader readAheadEntryReader = this.readAheadReader = new ReadAheadEntryReader( + getStreamName(), + getStartDLSN(), + bkDistributedLogManager.getConf(), + readHandler, + bkDistributedLogManager.getReaderEntryStore(), + bkDistributedLogManager.getScheduler(), + Ticker.systemTicker(), + bkDistributedLogManager.alertStatsLogger); + readHandler.checkLogStreamExists().addEventListener(new FutureEventListener<Void>() { + @Override + public void onSuccess(Void value) { + try { + readHandler.registerListener(readAheadEntryReader); + readHandler.asyncStartFetchLogSegments() + .map(new AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() { + @Override + public BoxedUnit apply(Versioned<List<LogSegmentMetadata>> logSegments) { + readAheadEntryReader.addStateChangeNotification(BKAsyncLogReader.this); + readAheadEntryReader.start(logSegments.getValue()); + return BoxedUnit.UNIT; + } + }); + } catch (Exception exc) { + notifyOnError(exc); } + } - @Override - public void onFailure(Throwable cause) { - notifyOnError(cause); - } - }); - } catch (IOException ioe) { - notifyOnError(ioe); - } + @Override + public void onFailure(Throwable cause) { + notifyOnError(cause); + } + }); } if (checkClosedOrInError("readNext")) { @@ -598,6 +594,8 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification { } lastProcessTime.reset().start(); + lastProcessTime.reset().start(); + // If the oldest pending promise is interrupted then we must mark // the reader in error and abort all pending reads since we dont // know the last consumed read http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java index 219c0cf..d20cc6a 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java @@ -50,7 +50,6 @@ import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache; import com.twitter.distributedlog.metadata.BKDLConfig; import com.twitter.distributedlog.metadata.LogStreamMetadataStore; import com.twitter.distributedlog.stats.BroadCastStatsLogger; -import com.twitter.distributedlog.stats.ReadAheadExceptionsLogger; import com.twitter.distributedlog.subscription.SubscriptionStateStore; import com.twitter.distributedlog.subscription.SubscriptionsStore; import com.twitter.distributedlog.subscription.ZKSubscriptionStateStore; @@ -72,8 +71,8 @@ import com.twitter.util.Future; import com.twitter.util.FuturePool; import com.twitter.util.FutureEventListener; import com.twitter.util.Promise; -import org.apache.bookkeeper.stats.AlertStatsLogger; import org.apache.bookkeeper.feature.FeatureProvider; +import org.apache.bookkeeper.stats.AlertStatsLogger; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.jboss.netty.channel.socket.ClientSocketChannelFactory; @@ -132,7 +131,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL } }; - private final String clientId; private final int regionId; private final String streamIdentifier; @@ -160,10 +158,11 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL // instantiating readers or writers. private final BookKeeperClientBuilder writerBKCBuilder; private final BookKeeperClient writerBKC; + private final LogSegmentEntryStore writerEntryStore; private final boolean ownWriterBKC; private final BookKeeperClientBuilder readerBKCBuilder; private final BookKeeperClient readerBKC; - private LogSegmentEntryStore readerEntryStore = null; + private final LogSegmentEntryStore readerEntryStore; private final boolean ownReaderBKC; // @@ -179,7 +178,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL private BKLogReadHandler readHandlerForListener = null; private FuturePool readerFuturePool = null; private final PendingReaders pendingReaders; - private final ReadAheadExceptionsLogger readAheadExceptionsLogger; // Failure Injector private final AsyncFailureInjector failureInjector; @@ -230,7 +228,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL null, null, null, - new ReadAheadExceptionsLogger(statsLogger), DistributedLogConstants.UNKNOWN_CLIENT_ID, DistributedLogConstants.LOCAL_REGION_ID, null, @@ -259,7 +256,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL * @param readAheadScheduler readAhead scheduler used by readers * @param channelFactory client socket channel factory to build bookkeeper clients * @param requestTimer request timer to build bookkeeper clients - * @param readAheadExceptionsLogger stats logger to record readahead exceptions * @param clientId client id that used to initiate the locks * @param regionId region id that would be encrypted as part of log segment metadata * to indicate which region that the log segment will be created @@ -287,7 +283,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL OrderedScheduler readAheadScheduler, ClientSocketChannelFactory channelFactory, HashedWheelTimer requestTimer, - ReadAheadExceptionsLogger readAheadExceptionsLogger, String clientId, Integer regionId, LedgerAllocator ledgerAllocator, @@ -296,7 +291,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL StatsLogger statsLogger, StatsLogger perLogStatsLogger) throws IOException { super(name, conf, uri, writerZKCBuilder, readerZKCBuilder, statsLogger); - Preconditions.checkNotNull(readAheadExceptionsLogger, "No ReadAhead Stats Logger Provided."); this.conf = conf; this.dynConf = dynConf; this.scheduler = scheduler; @@ -366,6 +360,12 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL this.ownWriterBKC = false; } this.writerBKC = this.writerBKCBuilder.build(); + this.writerEntryStore = new BKLogSegmentEntryStore( + conf, + writerBKC, + scheduler, + statsLogger, + failureInjector); // create the bkc for readers if (null == readerBKCBuilder) { @@ -395,13 +395,18 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL this.ownReaderBKC = false; } this.readerBKC = this.readerBKCBuilder.build(); + this.readerEntryStore = new BKLogSegmentEntryStore( + conf, + readerBKC, + scheduler, + statsLogger, + failureInjector); // Feature Provider this.featureProvider = featureProvider; // Stats this.alertStatsLogger = new AlertStatsLogger(this.perLogStatsLogger, "dl_alert"); - this.readAheadExceptionsLogger = readAheadExceptionsLogger; } @VisibleForTesting @@ -431,15 +436,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL return this.readerBKC; } - synchronized LogSegmentEntryStore getReaderEntryStore() throws IOException { - if (null == readerEntryStore) { - readerEntryStore = new BKLogSegmentEntryStore( - conf, - readerBKC.get(), - scheduler, - statsLogger, - failureInjector); - } + LogSegmentEntryStore getReaderEntryStore() { return this.readerEntryStore; } @@ -541,9 +538,9 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL subscriberId, conf, dynConf, - readerBKCBuilder, readerMetadataStore, logSegmentMetadataCache, + readerEntryStore, scheduler, alertStatsLogger, statsLogger, @@ -622,9 +619,9 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL final BKLogWriteHandler writeHandler = new BKLogWriteHandler( logMetadata, conf, - writerBKCBuilder, writerMetadataStore, logSegmentMetadataCache, + writerEntryStore, scheduler, allocator, statsLogger, @@ -821,33 +818,25 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL if (segmentIdx < 0) { return Future.value(new DLSN(segments.get(0).getLogSegmentSequenceNumber(), 0L, 0L)); } - final LedgerHandleCache handleCache = - LedgerHandleCache.newBuilder().bkc(readerBKC).conf(conf).build(); return getDLSNNotLessThanTxIdInSegment( fromTxnId, segmentIdx, segments, - handleCache - ).ensure(new AbstractFunction0<BoxedUnit>() { - @Override - public BoxedUnit apply() { - handleCache.clear(); - return BoxedUnit.UNIT; - } - }); + readerEntryStore + ); } private Future<DLSN> getDLSNNotLessThanTxIdInSegment(final long fromTxnId, final int segmentIdx, final List<LogSegmentMetadata> segments, - final LedgerHandleCache handleCache) { + final LogSegmentEntryStore entryStore) { final LogSegmentMetadata segment = segments.get(segmentIdx); return ReadUtils.getLogRecordNotLessThanTxId( name, segment, fromTxnId, scheduler, - handleCache, + entryStore, Math.max(2, dynConf.getReadAheadBatchSize()) ).flatMap(new AbstractFunction1<Optional<LogRecordWithDLSN>, Future<DLSN>>() { @Override @@ -870,7 +859,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL fromTxnId, segmentIdx + 1, segments, - handleCache); + entryStore); } } }); @@ -915,7 +904,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL * </p> * * @see DLUtils#findLogSegmentNotLessThanTxnId(List, long) - * @see ReadUtils#getLogRecordNotLessThanTxId(String, LogSegmentMetadata, long, ExecutorService, LedgerHandleCache, int) + * @see ReadUtils#getLogRecordNotLessThanTxId(String, LogSegmentMetadata, long, ExecutorService, LogSegmentEntryStore, int) * @param fromTxnId * transaction id to start reading from * @return future representing the open result. http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java index e7f29cc..1a23228 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java @@ -43,7 +43,6 @@ import com.twitter.distributedlog.metadata.BKDLConfig; import com.twitter.distributedlog.metadata.LogMetadataStore; import com.twitter.distributedlog.metadata.LogStreamMetadataStore; import com.twitter.distributedlog.namespace.DistributedLogNamespace; -import com.twitter.distributedlog.stats.ReadAheadExceptionsLogger; import com.twitter.distributedlog.util.ConfUtils; import com.twitter.distributedlog.util.DLUtils; import com.twitter.distributedlog.util.FutureUtils; @@ -113,10 +112,6 @@ import static com.twitter.distributedlog.impl.BKDLUtils.*; * See {@link PermitLimiter}. * </ul> * - * <h4>ReadAhead Exceptions</h4> - * Stats about exceptions that encountered in ReadAhead are exposed under <code>`scope`/exceptions</code>. - * See {@link ReadAheadExceptionsLogger}. - * * <h4>DistributedLogManager</h4> * * All the core stats about reader and writer are exposed under current scope via {@link BKDistributedLogManager}. @@ -305,7 +300,6 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { // Stats Loggers private final StatsLogger statsLogger; private final StatsLogger perLogStatsLogger; - private final ReadAheadExceptionsLogger readAheadExceptionsLogger; protected AtomicBoolean closed = new AtomicBoolean(false); @@ -436,9 +430,6 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { allocator = null; } - // Stats Loggers - this.readAheadExceptionsLogger = new ReadAheadExceptionsLogger(statsLogger); - // log metadata store if (bkdlConfig.isFederatedNamespace() || conf.isFederatedNamespaceEnabled()) { this.metadataStore = new FederatedZKLogMetadataStore(conf, namespace, sharedReaderZKCForDL, scheduler); @@ -895,7 +886,6 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { readAheadExecutor, /* Read Aheader Executor */ channelFactory, /* Netty Channel Factory */ requestTimer, /* Request Timer */ - readAheadExceptionsLogger, /* ReadAhead Exceptions Logger */ clientId, /* Client Id */ regionId, /* Region Id */ dlmLedgerAlloctor, /* Ledger Allocator */ http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java index caee864..0cf8ed5 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java @@ -17,12 +17,12 @@ */ package com.twitter.distributedlog; -import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import com.twitter.distributedlog.callback.LogSegmentNamesListener; import com.twitter.distributedlog.exceptions.LogEmptyException; import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException; import com.twitter.distributedlog.exceptions.UnexpectedException; +import com.twitter.distributedlog.logsegment.LogSegmentEntryStore; import com.twitter.distributedlog.metadata.LogMetadata; import com.twitter.distributedlog.io.AsyncAbortable; import com.twitter.distributedlog.io.AsyncCloseable; @@ -45,8 +45,6 @@ import org.apache.bookkeeper.versioning.Versioned; import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction0; -import scala.runtime.BoxedUnit; import java.io.IOException; import java.util.ArrayList; @@ -89,10 +87,10 @@ public abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable { protected final LogMetadata logMetadata; protected final DistributedLogConfiguration conf; - protected final BookKeeperClient bookKeeperClient; protected final LogStreamMetadataStore streamMetadataStore; protected final LogSegmentMetadataStore metadataStore; protected final LogSegmentMetadataCache metadataCache; + protected final LogSegmentEntryStore entryStore; protected final int firstNumEntriesPerReadLastRecordScan; protected final int maxNumEntriesPerReadLastRecordScan; protected volatile long lastLedgerRollingTimeMillis = -1; @@ -122,14 +120,13 @@ public abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable { */ BKLogHandler(LogMetadata metadata, DistributedLogConfiguration conf, - BookKeeperClientBuilder bkcBuilder, LogStreamMetadataStore streamMetadataStore, LogSegmentMetadataCache metadataCache, + LogSegmentEntryStore entryStore, OrderedScheduler scheduler, StatsLogger statsLogger, AlertStatsLogger alertStatsLogger, String lockClientId) { - Preconditions.checkNotNull(bkcBuilder); this.logMetadata = metadata; this.conf = conf; this.scheduler = scheduler; @@ -140,10 +137,10 @@ public abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable { conf.isLogSegmentSequenceNumberValidationEnabled()); firstNumEntriesPerReadLastRecordScan = conf.getFirstNumEntriesPerReadLastRecordScan(); maxNumEntriesPerReadLastRecordScan = conf.getMaxNumEntriesPerReadLastRecordScan(); - this.bookKeeperClient = bkcBuilder.build(); this.streamMetadataStore = streamMetadataStore; this.metadataStore = streamMetadataStore.getLogSegmentMetadataStore(); this.metadataCache = metadataCache; + this.entryStore = entryStore; this.lockClientId = lockClientId; // Traces @@ -293,8 +290,6 @@ public abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable { } private Future<LogRecordWithDLSN> asyncReadFirstUserRecord(LogSegmentMetadata ledger, DLSN beginDLSN) { - final LedgerHandleCache handleCache = - LedgerHandleCache.newBuilder().bkc(bookKeeperClient).conf(conf).build(); return ReadUtils.asyncReadFirstUserRecord( getFullyQualifiedName(), ledger, @@ -302,15 +297,9 @@ public abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable { maxNumEntriesPerReadLastRecordScan, new AtomicInteger(0), scheduler, - handleCache, + entryStore, beginDLSN - ).ensure(new AbstractFunction0<BoxedUnit>() { - @Override - public BoxedUnit apply() { - handleCache.clear(); - return BoxedUnit.UNIT; - } - }); + ); } /** @@ -422,8 +411,6 @@ public abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable { final boolean includeEndOfStream) { final AtomicInteger numRecordsScanned = new AtomicInteger(0); final Stopwatch stopwatch = Stopwatch.createStarted(); - final LedgerHandleCache handleCache = - LedgerHandleCache.newBuilder().bkc(bookKeeperClient).conf(conf).build(); return ReadUtils.asyncReadLastRecord( getFullyQualifiedName(), l, @@ -434,7 +421,7 @@ public abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable { maxNumEntriesPerReadLastRecordScan, numRecordsScanned, scheduler, - handleCache + entryStore ).addEventListener(new FutureEventListener<LogRecordWithDLSN>() { @Override public void onSuccess(LogRecordWithDLSN value) { @@ -446,12 +433,6 @@ public abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable { public void onFailure(Throwable cause) { recoverLastEntryStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); } - }).ensure(new AbstractFunction0<BoxedUnit>() { - @Override - public BoxedUnit apply() { - handleCache.clear(); - return BoxedUnit.UNIT; - } }); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java index 67c584c..8aa00e7 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java @@ -34,6 +34,7 @@ import com.twitter.distributedlog.exceptions.LockingException; import com.twitter.distributedlog.exceptions.LogNotFoundException; import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException; import com.twitter.distributedlog.exceptions.UnexpectedException; +import com.twitter.distributedlog.logsegment.LogSegmentEntryStore; import com.twitter.distributedlog.metadata.LogMetadataForReader; import com.twitter.distributedlog.lock.DistributedLock; import com.twitter.distributedlog.logsegment.LogSegmentFilter; @@ -106,7 +107,6 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener { static final Logger LOG = LoggerFactory.getLogger(BKLogReadHandler.class); protected final LogMetadataForReader logMetadataForReader; - protected final LedgerHandleCache handleCache; protected final DynamicDistributedLogConfiguration dynConf; @@ -134,9 +134,9 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener { Optional<String> subscriberId, DistributedLogConfiguration conf, DynamicDistributedLogConfiguration dynConf, - BookKeeperClientBuilder bkcBuilder, LogStreamMetadataStore streamMetadataStore, LogSegmentMetadataCache metadataCache, + LogSegmentEntryStore entryStore, OrderedScheduler scheduler, AlertStatsLogger alertStatsLogger, StatsLogger statsLogger, @@ -146,9 +146,9 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener { boolean isHandleForReading) { super(logMetadata, conf, - bkcBuilder, streamMetadataStore, metadataCache, + entryStore, scheduler, statsLogger, alertStatsLogger, @@ -158,12 +158,6 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener { this.perLogStatsLogger = isHandleForReading ? perLogStatsLogger : NullStatsLogger.INSTANCE; this.readerStateNotification = readerStateNotification; - - handleCache = LedgerHandleCache.newBuilder() - .bkc(this.bookKeeperClient) - .conf(conf) - .statsLogger(statsLogger) - .build(); this.subscriberId = subscriberId; } @@ -265,9 +259,6 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener { .flatMap(new AbstractFunction1<Void, Future<Void>>() { @Override public Future<Void> apply(Void result) { - if (null != handleCache) { - handleCache.clear(); - } // unregister the log segment listener metadataStore.unregisterLogSegmentListener(logMetadata.getLogSegmentsPath(), BKLogReadHandler.this); return Future.Void(); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java index 3f06700..25b25e2 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java @@ -30,6 +30,7 @@ import com.twitter.distributedlog.exceptions.TransactionIdOutOfOrderException; import com.twitter.distributedlog.exceptions.UnexpectedException; import com.twitter.distributedlog.function.GetLastTxIdFunction; import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryWriter; +import com.twitter.distributedlog.logsegment.LogSegmentEntryStore; import com.twitter.distributedlog.metadata.LogMetadataForWriter; import com.twitter.distributedlog.lock.DistributedLock; import com.twitter.distributedlog.logsegment.LogSegmentFilter; @@ -52,8 +53,6 @@ import com.twitter.util.Function; import com.twitter.util.Future; import com.twitter.util.FutureEventListener; import com.twitter.util.Promise; -import org.apache.bookkeeper.client.AsyncCallback; -import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.feature.FeatureProvider; import org.apache.bookkeeper.stats.AlertStatsLogger; @@ -151,9 +150,9 @@ class BKLogWriteHandler extends BKLogHandler { */ BKLogWriteHandler(LogMetadataForWriter logMetadata, DistributedLogConfiguration conf, - BookKeeperClientBuilder bkcBuilder, LogStreamMetadataStore streamMetadataStore, LogSegmentMetadataCache metadataCache, + LogSegmentEntryStore entryStore, OrderedScheduler scheduler, LedgerAllocator allocator, StatsLogger statsLogger, @@ -167,9 +166,9 @@ class BKLogWriteHandler extends BKLogHandler { DistributedLock lock /** owned by handler **/) { super(logMetadata, conf, - bkcBuilder, streamMetadataStore, metadataCache, + entryStore, scheduler, statsLogger, alertStatsLogger, @@ -1222,33 +1221,18 @@ class BKLogWriteHandler extends BKLogHandler { deleteOpStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); } }); - try { - bookKeeperClient.get().asyncDeleteLedger(ledgerMetadata.getLogSegmentId(), new AsyncCallback.DeleteCallback() { - @Override - public void deleteComplete(int rc, Object ctx) { - if (BKException.Code.NoSuchLedgerExistsException == rc) { - LOG.warn("No ledger {} found to delete for {} : {}.", - new Object[]{ledgerMetadata.getLogSegmentId(), getFullyQualifiedName(), - ledgerMetadata}); - } else if (BKException.Code.OK != rc) { - BKException bke = BKException.create(rc); - LOG.error("Couldn't delete ledger {} from bookkeeper for {} : ", - new Object[]{ledgerMetadata.getLogSegmentId(), getFullyQualifiedName(), bke}); - promise.setException(bke); - return; - } - // after the ledger is deleted, we delete the metadata znode - scheduler.submit(new Runnable() { - @Override - public void run() { - deleteLogSegmentMetadata(ledgerMetadata, promise); - } - }); - } - }, null); - } catch (IOException e) { - promise.setException(BKException.create(BKException.Code.BookieHandleNotAvailableException)); - } + entryStore.deleteLogSegment(ledgerMetadata) + .addEventListener(new FutureEventListener<LogSegmentMetadata>() { + @Override + public void onFailure(Throwable cause) { + FutureUtils.setException(promise, cause); + } + + @Override + public void onSuccess(LogSegmentMetadata segment) { + deleteLogSegmentMetadata(segment, promise); + } + }); return promise; } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/LedgerDescriptor.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/LedgerDescriptor.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/LedgerDescriptor.java deleted file mode 100644 index 5a95e46..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/LedgerDescriptor.java +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -public class LedgerDescriptor { - private final long ledgerId; - private final long logSegmentSequenceNo; - private final boolean fenced; - - public LedgerDescriptor(long ledgerId, long logSegmentSequenceNo, boolean fenced) { - this.ledgerId = ledgerId; - this.logSegmentSequenceNo = logSegmentSequenceNo; - this.fenced = fenced; - } - - public long getLedgerId() { - return ledgerId; - } - - public long getLogSegmentSequenceNo() { - return logSegmentSequenceNo; - } - - public boolean isFenced() { - return fenced; - } - - // Only compares the key portion - @Override - public boolean equals(Object other) { - if (!(other instanceof LedgerDescriptor)) { - return false; - } - LedgerDescriptor key = (LedgerDescriptor) other; - return ledgerId == key.ledgerId && - fenced == key.fenced; - } - - @Override - public int hashCode() { - return (int) (ledgerId * 13 ^ (fenced ? 0xFFFF : 0xF0F0) * 17); - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("(lid=").append(ledgerId).append(", lseqno=").append(logSegmentSequenceNo) - .append(", fenced=").append(fenced).append(")"); - return sb.toString(); - } -} - http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/LedgerHandleCache.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/LedgerHandleCache.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/LedgerHandleCache.java deleted file mode 100644 index 49896fd..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/LedgerHandleCache.java +++ /dev/null @@ -1,463 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import com.google.common.base.Preconditions; -import com.google.common.base.Stopwatch; -import com.twitter.distributedlog.util.FutureUtils; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; -import org.apache.bookkeeper.client.AsyncCallback; -import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.client.BookKeeper; -import org.apache.bookkeeper.client.LedgerEntry; -import org.apache.bookkeeper.client.LedgerHandle; -import org.apache.bookkeeper.stats.NullStatsLogger; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.commons.lang3.tuple.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Enumeration; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import static com.google.common.base.Charsets.UTF_8; - -/** - * A central place on managing open ledgers. - */ -public class LedgerHandleCache { - static final Logger LOG = LoggerFactory.getLogger(LedgerHandleCache.class); - - public static Builder newBuilder() { - return new Builder(); - } - - public static class Builder { - - private BookKeeperClient bkc; - private String digestpw; - private StatsLogger statsLogger = NullStatsLogger.INSTANCE; - - private Builder() {} - - public Builder bkc(BookKeeperClient bkc) { - this.bkc = bkc; - return this; - } - - public Builder conf(DistributedLogConfiguration conf) { - this.digestpw = conf.getBKDigestPW(); - return this; - } - - public Builder statsLogger(StatsLogger statsLogger) { - this.statsLogger = statsLogger; - return this; - } - - public LedgerHandleCache build() { - Preconditions.checkNotNull(bkc, "No bookkeeper client is provided"); - Preconditions.checkNotNull(digestpw, "No bookkeeper digest password is provided"); - Preconditions.checkNotNull(statsLogger, "No stats logger is provided"); - return new LedgerHandleCache(bkc, digestpw, statsLogger); - } - } - - final ConcurrentHashMap<LedgerDescriptor, RefCountedLedgerHandle> handlesMap = - new ConcurrentHashMap<LedgerDescriptor, RefCountedLedgerHandle>(); - - private final BookKeeperClient bkc; - private final String digestpw; - - private final OpStatsLogger openStats; - private final OpStatsLogger openNoRecoveryStats; - - private LedgerHandleCache(BookKeeperClient bkc, String digestpw, StatsLogger statsLogger) { - this.bkc = bkc; - this.digestpw = digestpw; - // Stats - openStats = statsLogger.getOpStatsLogger("open_ledger"); - openNoRecoveryStats = statsLogger.getOpStatsLogger("open_ledger_no_recovery"); - } - - /** - * Open the given ledger <i>ledgerDesc</i>. - * - * @param ledgerDesc - * ledger description - * @param callback - * open callback. - * @param ctx - * callback context - */ - private void asyncOpenLedger(LedgerDescriptor ledgerDesc, AsyncCallback.OpenCallback callback, Object ctx) { - try { - if (!ledgerDesc.isFenced()) { - bkc.get().asyncOpenLedgerNoRecovery(ledgerDesc.getLedgerId(), - BookKeeper.DigestType.CRC32, digestpw.getBytes(UTF_8), callback, ctx); - } else { - bkc.get().asyncOpenLedger(ledgerDesc.getLedgerId(), - BookKeeper.DigestType.CRC32, digestpw.getBytes(UTF_8), callback, ctx); - } - } catch (IOException ace) { - // :) when we can't get bkc, it means bookie handle not available - callback.openComplete(BKException.Code.BookieHandleNotAvailableException, null, ctx); - } - } - - /** - * Open the log segment. - * - * @param metadata - * the log segment metadata - * @param fence - * whether to fence the log segment during open - * @return a future presenting the open result. - */ - public Future<LedgerDescriptor> asyncOpenLedger(LogSegmentMetadata metadata, boolean fence) { - final Stopwatch stopwatch = Stopwatch.createStarted(); - final OpStatsLogger openStatsLogger = fence ? openStats : openNoRecoveryStats; - final Promise<LedgerDescriptor> promise = new Promise<LedgerDescriptor>(); - final LedgerDescriptor ledgerDesc = new LedgerDescriptor(metadata.getLogSegmentId(), metadata.getLogSegmentSequenceNumber(), fence); - RefCountedLedgerHandle refhandle = handlesMap.get(ledgerDesc); - if (null == refhandle) { - asyncOpenLedger(ledgerDesc, new AsyncCallback.OpenCallback() { - @Override - public void openComplete(int rc, LedgerHandle lh, Object ctx) { - if (BKException.Code.OK != rc) { - promise.setException(BKException.create(rc)); - return; - } - RefCountedLedgerHandle newRefHandle = new RefCountedLedgerHandle(lh); - RefCountedLedgerHandle oldRefHandle = handlesMap.putIfAbsent(ledgerDesc, newRefHandle); - if (null != oldRefHandle) { - oldRefHandle.addRef(); - if (newRefHandle.removeRef()) { - newRefHandle.handle.asyncClose(new AsyncCallback.CloseCallback() { - @Override - public void closeComplete(int i, LedgerHandle ledgerHandle, Object o) { - // No action necessary - } - }, null); - } - } - promise.setValue(ledgerDesc); - } - }, null); - } else { - refhandle.addRef(); - promise.setValue(ledgerDesc); - } - return promise.addEventListener(new FutureEventListener<LedgerDescriptor>() { - @Override - public void onSuccess(LedgerDescriptor value) { - openStatsLogger.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); - } - - @Override - public void onFailure(Throwable cause) { - openStatsLogger.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); - } - }); - } - - /** - * Open a ledger synchronously. - * - * @param metadata - * log segment metadata - * @param fence - * whether to fence the log segment during open - * @return ledger descriptor - * @throws BKException - */ - public LedgerDescriptor openLedger(LogSegmentMetadata metadata, boolean fence) throws BKException { - return FutureUtils.bkResult(asyncOpenLedger(metadata, fence)); - } - - private RefCountedLedgerHandle getLedgerHandle(LedgerDescriptor ledgerDescriptor) { - return null == ledgerDescriptor ? null : handlesMap.get(ledgerDescriptor); - } - - /** - * Close the ledger asynchronously. - * - * @param ledgerDesc - * ledger descriptor. - * @return future presenting the closing result. - */ - public Future<Void> asyncCloseLedger(LedgerDescriptor ledgerDesc) { - final Promise<Void> promise = new Promise<Void>(); - - RefCountedLedgerHandle refhandle = getLedgerHandle(ledgerDesc); - if ((null != refhandle) && (refhandle.removeRef())) { - refhandle = handlesMap.remove(ledgerDesc); - if (refhandle.getRefCount() > 0) { - // In the rare race condition that a ref count was added immediately - // after the close de-refed it and the remove was called - - // Try to put the handle back in the map - handlesMap.putIfAbsent(ledgerDesc, refhandle); - - // ReadOnlyLedgerHandles don't have much overhead, so lets just leave - // the handle open even if it had already been replaced - promise.setValue(null); - } else { - refhandle.handle.asyncClose(new AsyncCallback.CloseCallback() { - @Override - public void closeComplete(int rc, LedgerHandle ledgerHandle, Object ctx) { - if (BKException.Code.OK == rc) { - promise.setValue(null); - } else { - promise.setException(BKException.create(rc)); - } - } - }, null); - } - } else { - promise.setValue(null); - } - return promise; - } - - /** - * Close the ledger synchronously. - * - * @param ledgerDesc - * ledger descriptor. - * @throws BKException - */ - public void closeLedger(LedgerDescriptor ledgerDesc) throws BKException { - FutureUtils.bkResult(asyncCloseLedger(ledgerDesc)); - } - - /** - * Get the last add confirmed of <code>ledgerDesc</code>. - * - * @param ledgerDesc - * ledger descriptor. - * @return last add confirmed of <code>ledgerDesc</code> - * @throws BKException - */ - public long getLastAddConfirmed(LedgerDescriptor ledgerDesc) throws BKException { - RefCountedLedgerHandle refhandle = getLedgerHandle(ledgerDesc); - - if (null == refhandle) { - LOG.error("Accessing ledger {} without opening.", ledgerDesc); - throw BKException.create(BKException.Code.UnexpectedConditionException); - } - - return refhandle.handle.getLastAddConfirmed(); - } - - /** - * Whether a ledger is closed or not. - * - * @param ledgerDesc - * ledger descriptor. - * @return true if a ledger is closed, otherwise false. - * @throws BKException - */ - public boolean isLedgerHandleClosed(LedgerDescriptor ledgerDesc) throws BKException { - RefCountedLedgerHandle refhandle = getLedgerHandle(ledgerDesc); - - if (null == refhandle) { - LOG.error("Accessing ledger {} without opening.", ledgerDesc); - throw BKException.create(BKException.Code.UnexpectedConditionException); - } - - return refhandle.handle.isClosed(); - } - - /** - * Async try read last confirmed. - * - * @param ledgerDesc - * ledger descriptor - * @return future presenting read last confirmed result. - */ - public Future<Long> asyncTryReadLastConfirmed(LedgerDescriptor ledgerDesc) { - RefCountedLedgerHandle refHandle = handlesMap.get(ledgerDesc); - if (null == refHandle) { - LOG.error("Accessing ledger {} without opening.", ledgerDesc); - return Future.exception(BKException.create(BKException.Code.UnexpectedConditionException)); - } - final Promise<Long> promise = new Promise<Long>(); - refHandle.handle.asyncTryReadLastConfirmed(new AsyncCallback.ReadLastConfirmedCallback() { - @Override - public void readLastConfirmedComplete(int rc, long lastAddConfirmed, Object ctx) { - if (BKException.Code.OK == rc) { - promise.setValue(lastAddConfirmed); - } else { - promise.setException(BKException.create(rc)); - } - } - }, null); - return promise; - } - - /** - * Try read last confirmed. - * - * @param ledgerDesc - * ledger descriptor - * @return last confirmed - * @throws BKException - */ - public long tryReadLastConfirmed(LedgerDescriptor ledgerDesc) throws BKException { - return FutureUtils.bkResult(asyncTryReadLastConfirmed(ledgerDesc)); - } - - /** - * Async read last confirmed and entry - * - * @param ledgerDesc - * ledger descriptor - * @param entryId - * entry id to read - * @param timeOutInMillis - * time out if no newer entry available - * @param parallel - * whether to read from replicas in parallel - */ - public Future<Pair<Long, LedgerEntry>> asyncReadLastConfirmedAndEntry( - LedgerDescriptor ledgerDesc, - long entryId, - long timeOutInMillis, - boolean parallel) { - RefCountedLedgerHandle refHandle = handlesMap.get(ledgerDesc); - if (null == refHandle) { - LOG.error("Accessing ledger {} without opening.", ledgerDesc); - return Future.exception(BKException.create(BKException.Code.UnexpectedConditionException)); - } - final Promise<Pair<Long, LedgerEntry>> promise = new Promise<Pair<Long, LedgerEntry>>(); - refHandle.handle.asyncReadLastConfirmedAndEntry(entryId, timeOutInMillis, parallel, - new AsyncCallback.ReadLastConfirmedAndEntryCallback() { - @Override - public void readLastConfirmedAndEntryComplete(int rc, long lac, LedgerEntry ledgerEntry, Object ctx) { - if (BKException.Code.OK == rc) { - promise.setValue(Pair.of(lac, ledgerEntry)); - } else { - promise.setException(BKException.create(rc)); - } - } - }, null); - return promise; - } - - /** - * Async Read Entries - * - * @param ledgerDesc - * ledger descriptor - * @param first - * first entry - * @param last - * second entry - */ - public Future<Enumeration<LedgerEntry>> asyncReadEntries( - LedgerDescriptor ledgerDesc, long first, long last) { - RefCountedLedgerHandle refHandle = handlesMap.get(ledgerDesc); - if (null == refHandle) { - LOG.error("Accessing ledger {} without opening.", ledgerDesc); - return Future.exception(BKException.create(BKException.Code.UnexpectedConditionException)); - } - final Promise<Enumeration<LedgerEntry>> promise = new Promise<Enumeration<LedgerEntry>>(); - refHandle.handle.asyncReadEntries(first, last, new AsyncCallback.ReadCallback() { - @Override - public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> entries, Object ctx) { - if (BKException.Code.OK == rc) { - promise.setValue(entries); - } else { - promise.setException(BKException.create(rc)); - } - } - }, null); - return promise; - } - - public Enumeration<LedgerEntry> readEntries(LedgerDescriptor ledgerDesc, long first, long last) - throws BKException { - return FutureUtils.bkResult(asyncReadEntries(ledgerDesc, first, last)); - } - - public long getLength(LedgerDescriptor ledgerDesc) throws BKException { - RefCountedLedgerHandle refhandle = getLedgerHandle(ledgerDesc); - - if (null == refhandle) { - LOG.error("Accessing ledger {} without opening.", ledgerDesc); - throw BKException.create(BKException.Code.UnexpectedConditionException); - } - - return refhandle.handle.getLength(); - } - - public void clear() { - if (null != handlesMap) { - Iterator<Map.Entry<LedgerDescriptor, RefCountedLedgerHandle>> handlesMapIter = handlesMap.entrySet().iterator(); - while (handlesMapIter.hasNext()) { - Map.Entry<LedgerDescriptor, RefCountedLedgerHandle> entry = handlesMapIter.next(); - // Make it inaccessible through the map - handlesMapIter.remove(); - // now close the ledger - entry.getValue().forceClose(); - } - } - } - - static class RefCountedLedgerHandle { - public final LedgerHandle handle; - final AtomicLong refcount = new AtomicLong(0); - - RefCountedLedgerHandle(LedgerHandle lh) { - this.handle = lh; - addRef(); - } - - long getRefCount() { - return refcount.get(); - } - - public void addRef() { - refcount.incrementAndGet(); - } - - public boolean removeRef() { - return (refcount.decrementAndGet() == 0); - } - - public void forceClose() { - try { - handle.close(); - } catch (BKException.BKLedgerClosedException exc) { - // Ignore - } catch (Exception exc) { - LOG.warn("Exception while closing ledger {}", handle, exc); - } - } - - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadCache.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadCache.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadCache.java deleted file mode 100644 index 284b327..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadCache.java +++ /dev/null @@ -1,233 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import java.io.IOException; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import com.google.common.base.Stopwatch; -import com.google.common.base.Ticker; -import com.twitter.distributedlog.callback.ReadAheadCallback; -import com.twitter.distributedlog.exceptions.DLInterruptedException; -import com.twitter.distributedlog.exceptions.InvalidEnvelopedEntryException; -import com.twitter.distributedlog.exceptions.LogReadException; -import org.apache.bookkeeper.client.LedgerEntry; -import org.apache.bookkeeper.stats.AlertStatsLogger; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ReadAheadCache { - static final Logger LOG = LoggerFactory.getLogger(ReadAheadCache.class); - - private final String streamName; - private final LinkedBlockingQueue<Entry.Reader> readAheadEntries; - private final int maxCachedEntries; - private final AtomicReference<IOException> lastException = new AtomicReference<IOException>(); - private final boolean deserializeRecordSet; - // callbacks - private final AsyncNotification notification; - private ReadAheadCallback readAheadCallback = null; - - // variables for idle reader detection - private final Stopwatch lastEntryProcessTime; - - private final AlertStatsLogger alertStatsLogger; - - public ReadAheadCache(String streamName, - AlertStatsLogger alertStatsLogger, - AsyncNotification notification, - int maxCachedRecords, - boolean deserializeRecordSet, - Ticker ticker) { - this.streamName = streamName; - this.maxCachedEntries = maxCachedRecords; - this.notification = notification; - this.deserializeRecordSet = deserializeRecordSet; - - // create the readahead queue - readAheadEntries = new LinkedBlockingQueue<Entry.Reader>(); - - // start the idle reader detection - lastEntryProcessTime = Stopwatch.createStarted(ticker); - - // Stats - this.alertStatsLogger = alertStatsLogger; - } - - /** - * Trigger read ahead callback - */ - private synchronized void invokeReadAheadCallback() { - if (null != readAheadCallback) { - if (LOG.isTraceEnabled()) { - LOG.trace("Cache has space, schedule the read ahead"); - } - readAheadCallback.resumeReadAhead(); - readAheadCallback = null; - } - } - - /** - * Register a readhead callback. - * - * @param readAheadCallback - * read ahead callback - */ - public synchronized void setReadAheadCallback(ReadAheadCallback readAheadCallback) { - this.readAheadCallback = readAheadCallback; - if (!isCacheFull()) { - invokeReadAheadCallback(); - } - } - - private void setLastException(IOException exc) { - lastException.set(exc); - } - - /** - * Poll next entry from the readahead queue. - * - * @return next entry from readahead queue. null if no entries available in the queue. - * @throws IOException - */ - public Entry.Reader getNextReadAheadEntry() throws IOException { - return getNextReadAheadEntry(0L, TimeUnit.MILLISECONDS); - } - - public Entry.Reader getNextReadAheadEntry(long waitTime, TimeUnit waitTimeUnit) throws IOException { - if (null != lastException.get()) { - throw lastException.get(); - } - - Entry.Reader entry = null; - try { - entry = readAheadEntries.poll(waitTime, waitTimeUnit); - } catch (InterruptedException e) { - throw new DLInterruptedException("Interrupted on polling readahead entries : ", e); - } - - if (null != entry) { - if (!isCacheFull()) { - invokeReadAheadCallback(); - } - } - - return entry; - } - - /** - * Check whether the readahead becomes stall. - * - * @param idleReaderErrorThreshold - * idle reader error threshold - * @param timeUnit - * time unit of the idle reader error threshold - * @return true if the readahead becomes stall, otherwise false. - */ - public boolean isReadAheadIdle(int idleReaderErrorThreshold, TimeUnit timeUnit) { - return (lastEntryProcessTime.elapsed(timeUnit) > idleReaderErrorThreshold); - } - - /** - * Set an ledger entry to readahead cache - * - * @param key - * read position of the entry - * @param entry - * the ledger entry - * @param reason - * the reason to add the entry to readahead (for logging) - * @param envelopeEntries - * whether this entry an enveloped entries or not - * @param startSequenceId - * the start sequence id - */ - public void set(LedgerReadPosition key, - LedgerEntry entry, - String reason, - boolean envelopeEntries, - long startSequenceId) { - processNewLedgerEntry(key, entry, reason, envelopeEntries, startSequenceId); - lastEntryProcessTime.reset().start(); - AsyncNotification n = notification; - if (null != n) { - n.notifyOnOperationComplete(); - } - } - - public boolean isCacheFull() { - return getNumCachedEntries() >= maxCachedEntries; - } - - /** - * Return number cached records. - * - * @return number cached records. - */ - public int getNumCachedEntries() { - return readAheadEntries.size(); - } - - /** - * Process the new ledger entry and propagate the records into readahead queue. - * - * @param readPosition - * position of the ledger entry - * @param ledgerEntry - * ledger entry - * @param reason - * reason to add this ledger entry - * @param envelopeEntries - * whether this entry is enveloped - * @param startSequenceId - * the start sequence id of this log segment - */ - private void processNewLedgerEntry(final LedgerReadPosition readPosition, - final LedgerEntry ledgerEntry, - final String reason, - boolean envelopeEntries, - long startSequenceId) { - try { - Entry.Reader reader = Entry.newBuilder() - .setLogSegmentInfo(readPosition.getLogSegmentSequenceNumber(), startSequenceId) - .setEntryId(ledgerEntry.getEntryId()) - .setEnvelopeEntry(envelopeEntries) - .deserializeRecordSet(deserializeRecordSet) - .setInputStream(ledgerEntry.getEntryInputStream()) - .buildReader(); - readAheadEntries.add(reader); - } catch (InvalidEnvelopedEntryException ieee) { - alertStatsLogger.raise("Found invalid enveloped entry on stream {} : ", streamName, ieee); - setLastException(ieee); - } catch (IOException exc) { - setLastException(exc); - } - } - - public void clear() { - readAheadEntries.clear(); - } - - @Override - public String toString() { - return String.format("%s: Num Cached Entries: %d", - streamName, getNumCachedEntries()); - } -}