http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadUtils.java index 97f694f..f481561 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadUtils.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadUtils.java @@ -18,29 +18,25 @@ package com.twitter.distributedlog; import java.io.IOException; -import java.util.Enumeration; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.client.LedgerEntry; -import org.apache.bookkeeper.stats.NullStatsLogger; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import com.twitter.distributedlog.logsegment.LogSegmentEntryStore; +import com.twitter.distributedlog.logsegment.LogSegmentRandomAccessEntryReader; import com.google.common.base.Optional; import com.google.common.collect.Lists; import com.twitter.distributedlog.selector.FirstDLSNNotLessThanSelector; import com.twitter.distributedlog.selector.FirstTxIdNotLessThanSelector; import com.twitter.distributedlog.selector.LastRecordSelector; import com.twitter.distributedlog.selector.LogRecordSelector; -import com.twitter.distributedlog.util.FutureUtils; import com.twitter.distributedlog.util.FutureUtils.FutureEventListenerRunnable; import com.twitter.util.Future; import com.twitter.util.FutureEventListener; import com.twitter.util.Promise; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.runtime.AbstractFunction0; import scala.runtime.BoxedUnit; @@ -58,14 +54,14 @@ public class ReadUtils { // /** - * Read last record from a ledger. + * Read last record from a log segment. * * @param streamName * fully qualified stream name (used for logging) * @param l - * ledger descriptor. + * log segment metadata. * @param fence - * whether to fence the ledger. + * whether to fence the log segment. * @param includeControl * whether to include control record. * @param includeEndOfStream @@ -78,8 +74,8 @@ public class ReadUtils { * num of records scanned to get last record * @param executorService * executor service used for processing entries - * @param handleCache - * ledger handle cache + * @param entryStore + * log segment entry store * @return a future with last record. */ public static Future<LogRecordWithDLSN> asyncReadLastRecord( @@ -92,20 +88,20 @@ public class ReadUtils { final int scanMaxBatchSize, final AtomicInteger numRecordsScanned, final ExecutorService executorService, - final LedgerHandleCache handleCache) { + final LogSegmentEntryStore entryStore) { final LogRecordSelector selector = new LastRecordSelector(); return asyncReadRecord(streamName, l, fence, includeControl, includeEndOfStream, scanStartBatchSize, - scanMaxBatchSize, numRecordsScanned, executorService, handleCache, + scanMaxBatchSize, numRecordsScanned, executorService, entryStore, selector, true /* backward */, 0L); } /** - * Read first record from a ledger with a DLSN larger than that given. + * Read first record from a log segment with a DLSN larger than that given. * * @param streamName * fully qualified stream name (used for logging) * @param l - * ledger descriptor. + * log segment metadata. * @param scanStartBatchSize * first num entries used for read last record scan * @param scanMaxBatchSize @@ -114,6 +110,8 @@ public class ReadUtils { * num of records scanned to get last record * @param executorService * executor service used for processing entries + * @param entryStore + * log segment entry store * @param dlsn * threshold dlsn * @return a future with last record. @@ -125,7 +123,7 @@ public class ReadUtils { final int scanMaxBatchSize, final AtomicInteger numRecordsScanned, final ExecutorService executorService, - final LedgerHandleCache handleCache, + final LogSegmentEntryStore entryStore, final DLSN dlsn) { long startEntryId = 0L; if (l.getLogSegmentSequenceNumber() == dlsn.getLogSegmentSequenceNo()) { @@ -133,7 +131,7 @@ public class ReadUtils { } final LogRecordSelector selector = new FirstDLSNNotLessThanSelector(dlsn); return asyncReadRecord(streamName, l, false, false, false, scanStartBatchSize, - scanMaxBatchSize, numRecordsScanned, executorService, handleCache, + scanMaxBatchSize, numRecordsScanned, executorService, entryStore, selector, false /* backward */, startEntryId); } @@ -233,14 +231,12 @@ public class ReadUtils { } /** - * Read record from a given range of ledger entries. + * Read record from a given range of log segment entries. * * @param streamName * fully qualified stream name (used for logging) - * @param ledgerDescriptor - * ledger descriptor. - * @param handleCache - * ledger handle cache. + * @param reader + * log segment random access reader * @param executorService * executor service used for processing entries * @param context @@ -249,8 +245,7 @@ public class ReadUtils { */ private static Future<LogRecordWithDLSN> asyncReadRecordFromEntries( final String streamName, - final LedgerDescriptor ledgerDescriptor, - LedgerHandleCache handleCache, + final LogSegmentRandomAccessEntryReader reader, final LogSegmentMetadata metadata, final ExecutorService executorService, final ScanContext context, @@ -260,22 +255,19 @@ public class ReadUtils { final long endEntryId = context.curEndEntryId.get(); if (LOG.isDebugEnabled()) { LOG.debug("{} reading entries [{} - {}] from {}.", - new Object[] { streamName, startEntryId, endEntryId, ledgerDescriptor }); + new Object[] { streamName, startEntryId, endEntryId, metadata}); } - FutureEventListener<Enumeration<LedgerEntry>> readEntriesListener = - new FutureEventListener<Enumeration<LedgerEntry>>() { + FutureEventListener<List<Entry.Reader>> readEntriesListener = + new FutureEventListener<List<Entry.Reader>>() { @Override - public void onSuccess(final Enumeration<LedgerEntry> entries) { + public void onSuccess(final List<Entry.Reader> entries) { if (LOG.isDebugEnabled()) { LOG.debug("{} finished reading entries [{} - {}] from {}", - new Object[]{ streamName, startEntryId, endEntryId, ledgerDescriptor }); + new Object[]{ streamName, startEntryId, endEntryId, metadata}); } - LogRecordWithDLSN record = null; - while (entries.hasMoreElements()) { - LedgerEntry entry = entries.nextElement(); + for (Entry.Reader entry : entries) { try { - visitEntryRecords( - streamName, metadata, ledgerDescriptor.getLogSegmentSequenceNo(), entry, context, selector); + visitEntryRecords(entry, context, selector); } catch (IOException ioe) { // exception is only thrown due to bad ledger entry, so it might be corrupted // we shouldn't do anything beyond this point. throw the exception to application @@ -284,24 +276,21 @@ public class ReadUtils { } } - record = selector.result(); + LogRecordWithDLSN record = selector.result(); if (LOG.isDebugEnabled()) { LOG.debug("{} got record from entries [{} - {}] of {} : {}", new Object[]{streamName, startEntryId, endEntryId, - ledgerDescriptor, record}); + metadata, record}); } promise.setValue(record); } @Override public void onFailure(final Throwable cause) { - String errMsg = "Error reading entries [" + startEntryId + "-" + endEntryId - + "] for reading record of " + streamName; - promise.setException(new IOException(errMsg, - BKException.create(FutureUtils.bkResultCode(cause)))); + promise.setException(cause); } }; - handleCache.asyncReadEntries(ledgerDescriptor, startEntryId, endEntryId) + reader.readEntries(startEntryId, endEntryId) .addEventListener(FutureEventListenerRunnable.of(readEntriesListener, executorService)); return promise; } @@ -309,10 +298,6 @@ public class ReadUtils { /** * Process each record using LogRecordSelector. * - * @param streamName - * fully qualified stream name (used for logging) - * @param logSegmentSeqNo - * ledger sequence number * @param entry * ledger entry * @param context @@ -321,22 +306,13 @@ public class ReadUtils { * @throws IOException */ private static void visitEntryRecords( - String streamName, - LogSegmentMetadata metadata, - long logSegmentSeqNo, - LedgerEntry entry, + Entry.Reader entry, ScanContext context, LogRecordSelector selector) throws IOException { - Entry.Reader reader = Entry.newBuilder() - .setLogSegmentInfo(logSegmentSeqNo, metadata.getStartSequenceId()) - .setEntryId(entry.getEntryId()) - .setEnvelopeEntry(metadata.getEnvelopeEntries()) - .setInputStream(entry.getEntryInputStream()) - .buildReader(); - LogRecordWithDLSN nextRecord = reader.nextRecord(); + LogRecordWithDLSN nextRecord = entry.nextRecord(); while (nextRecord != null) { LogRecordWithDLSN record = nextRecord; - nextRecord = reader.nextRecord(); + nextRecord = entry.nextRecord(); context.numRecordsScanned.incrementAndGet(); if (!context.includeControl && record.isControl()) { continue; @@ -353,10 +329,8 @@ public class ReadUtils { * * @param streamName * fully qualified stream name (used for logging) - * @param ledgerDescriptor - * ledger descriptor. - * @param handleCache - * ledger handle cache. + * @param reader + * log segment random access reader * @param executorService * executor service used for processing entries * @param promise @@ -366,8 +340,7 @@ public class ReadUtils { */ private static void asyncReadRecordFromEntries( final String streamName, - final LedgerDescriptor ledgerDescriptor, - final LedgerHandleCache handleCache, + final LogSegmentRandomAccessEntryReader reader, final LogSegmentMetadata metadata, final ExecutorService executorService, final Promise<LogRecordWithDLSN> promise, @@ -380,7 +353,7 @@ public class ReadUtils { if (LOG.isDebugEnabled()) { LOG.debug("{} read record from [{} - {}] of {} : {}", new Object[]{streamName, context.curStartEntryId.get(), context.curEndEntryId.get(), - ledgerDescriptor, value}); + metadata, value}); } if (null != value) { promise.setValue(value); @@ -393,8 +366,7 @@ public class ReadUtils { } // scan next range asyncReadRecordFromEntries(streamName, - ledgerDescriptor, - handleCache, + reader, metadata, executorService, promise, @@ -407,14 +379,13 @@ public class ReadUtils { promise.setException(cause); } }; - asyncReadRecordFromEntries(streamName, ledgerDescriptor, handleCache, metadata, executorService, context, selector) + asyncReadRecordFromEntries(streamName, reader, metadata, executorService, context, selector) .addEventListener(FutureEventListenerRunnable.of(readEntriesListener, executorService)); } private static void asyncReadRecordFromLogSegment( final String streamName, - final LedgerDescriptor ledgerDescriptor, - final LedgerHandleCache handleCache, + final LogSegmentRandomAccessEntryReader reader, final LogSegmentMetadata metadata, final ExecutorService executorService, final int scanStartBatchSize, @@ -426,16 +397,10 @@ public class ReadUtils { final LogRecordSelector selector, final boolean backward, final long startEntryId) { - final long lastAddConfirmed; - try { - lastAddConfirmed = handleCache.getLastAddConfirmed(ledgerDescriptor); - } catch (BKException e) { - promise.setException(e); - return; - } + final long lastAddConfirmed = reader.getLastAddConfirmed(); if (lastAddConfirmed < 0) { if (LOG.isDebugEnabled()) { - LOG.debug("Ledger {} is empty for {}.", new Object[] { ledgerDescriptor, streamName }); + LOG.debug("Log segment {} is empty for {}.", new Object[] { metadata, streamName }); } promise.setValue(null); return; @@ -444,7 +409,7 @@ public class ReadUtils { startEntryId, lastAddConfirmed, scanStartBatchSize, scanMaxBatchSize, includeControl, includeEndOfStream, backward, numRecordsScanned); - asyncReadRecordFromEntries(streamName, ledgerDescriptor, handleCache, metadata, executorService, + asyncReadRecordFromEntries(streamName, reader, metadata, executorService, promise, context, selector); } @@ -458,25 +423,25 @@ public class ReadUtils { final int scanMaxBatchSize, final AtomicInteger numRecordsScanned, final ExecutorService executorService, - final LedgerHandleCache handleCache, + final LogSegmentEntryStore entryStore, final LogRecordSelector selector, final boolean backward, final long startEntryId) { final Promise<LogRecordWithDLSN> promise = new Promise<LogRecordWithDLSN>(); - FutureEventListener<LedgerDescriptor> openLedgerListener = - new FutureEventListener<LedgerDescriptor>() { + FutureEventListener<LogSegmentRandomAccessEntryReader> openReaderListener = + new FutureEventListener<LogSegmentRandomAccessEntryReader>() { @Override - public void onSuccess(final LedgerDescriptor ledgerDescriptor) { + public void onSuccess(final LogSegmentRandomAccessEntryReader reader) { if (LOG.isDebugEnabled()) { - LOG.debug("{} Opened logsegment {} for reading record", + LOG.debug("{} Opened log segment {} for reading record", streamName, l); } promise.ensure(new AbstractFunction0<BoxedUnit>() { @Override public BoxedUnit apply() { - handleCache.asyncCloseLedger(ledgerDescriptor); + reader.asyncClose(); return BoxedUnit.UNIT; } }); @@ -485,7 +450,7 @@ public class ReadUtils { (backward ? "backward" : "forward"), streamName, l}); } asyncReadRecordFromLogSegment( - streamName, ledgerDescriptor, handleCache, l, executorService, + streamName, reader, l, executorService, scanStartBatchSize, scanMaxBatchSize, includeControl, includeEndOfStream, promise, numRecordsScanned, selector, backward, startEntryId); @@ -493,13 +458,11 @@ public class ReadUtils { @Override public void onFailure(final Throwable cause) { - String errMsg = "Error opening log segment [" + l + "] for reading record of " + streamName; - promise.setException(new IOException(errMsg, - BKException.create(FutureUtils.bkResultCode(cause)))); + promise.setException(cause); } }; - handleCache.asyncOpenLedger(l, fence) - .addEventListener(FutureEventListenerRunnable.of(openLedgerListener, executorService)); + entryStore.openRandomAccessReader(l, fence) + .addEventListener(FutureEventListenerRunnable.of(openReaderListener, executorService)); return promise; } @@ -530,8 +493,8 @@ public class ReadUtils { * transaction id * @param executorService * executor service used for processing entries - * @param handleCache - * ledger handle cache + * @param entryStore + * log segment entry store * @param nWays * how many number of entries to search in parallel * @return found log record. none if all transaction ids are less than provided <code>transactionId</code>. @@ -541,7 +504,7 @@ public class ReadUtils { final LogSegmentMetadata segment, final long transactionId, final ExecutorService executorService, - final LedgerHandleCache handleCache, + final LogSegmentEntryStore entryStore, final int nWays) { if (!segment.isInProgress()) { if (segment.getLastTxId() < transactionId) { @@ -554,25 +517,19 @@ public class ReadUtils { final Promise<Optional<LogRecordWithDLSN>> promise = new Promise<Optional<LogRecordWithDLSN>>(); - final FutureEventListener<LedgerDescriptor> openLedgerListener = - new FutureEventListener<LedgerDescriptor>() { + final FutureEventListener<LogSegmentRandomAccessEntryReader> openReaderListener = + new FutureEventListener<LogSegmentRandomAccessEntryReader>() { @Override - public void onSuccess(final LedgerDescriptor ld) { + public void onSuccess(final LogSegmentRandomAccessEntryReader reader) { promise.ensure(new AbstractFunction0<BoxedUnit>() { @Override public BoxedUnit apply() { - handleCache.asyncCloseLedger(ld); + reader.asyncClose(); return BoxedUnit.UNIT; } }); - long lastEntryId; - try { - lastEntryId = handleCache.getLastAddConfirmed(ld); - } catch (BKException e) { - promise.setException(e); - return; - } + long lastEntryId = reader.getLastAddConfirmed(); if (lastEntryId < 0) { // it means that the log segment is created but not written yet or an empty log segment. // it is equivalent to 'all log records whose transaction id is less than provided transactionId' @@ -586,8 +543,7 @@ public class ReadUtils { new FirstTxIdNotLessThanSelector(transactionId); asyncReadRecordFromEntries( logName, - ld, - handleCache, + reader, segment, executorService, new SingleEntryScanContext(0L), @@ -608,11 +564,10 @@ public class ReadUtils { } getLogRecordNotLessThanTxIdFromEntries( logName, - ld, segment, transactionId, executorService, - handleCache, + reader, Lists.newArrayList(0L, lastEntryId), nWays, Optional.<LogRecordWithDLSN>absent(), @@ -621,14 +576,12 @@ public class ReadUtils { @Override public void onFailure(final Throwable cause) { - String errMsg = "Error opening log segment [" + segment - + "] for find record from " + logName; - promise.setException(new IOException(errMsg, - BKException.create(FutureUtils.bkResultCode(cause)))); + promise.setException(cause); } }; - handleCache.asyncOpenLedger(segment, false) - .addEventListener(FutureEventListenerRunnable.of(openLedgerListener, executorService)); + + entryStore.openRandomAccessReader(segment, false) + .addEventListener(FutureEventListenerRunnable.of(openReaderListener, executorService)); return promise; } @@ -644,8 +597,8 @@ public class ReadUtils { * provided transaction id to search * @param executorService * executor service - * @param handleCache - * handle cache + * @param reader + * log segment random access reader * @param entriesToSearch * list of entries to search * @param nWays @@ -657,11 +610,10 @@ public class ReadUtils { */ private static void getLogRecordNotLessThanTxIdFromEntries( final String logName, - final LedgerDescriptor ld, final LogSegmentMetadata segment, final long transactionId, final ExecutorService executorService, - final LedgerHandleCache handleCache, + final LogSegmentRandomAccessEntryReader reader, final List<Long> entriesToSearch, final int nWays, final Optional<LogRecordWithDLSN> prevFoundRecord, @@ -672,8 +624,7 @@ public class ReadUtils { LogRecordSelector selector = new FirstTxIdNotLessThanSelector(transactionId); Future<LogRecordWithDLSN> searchResult = asyncReadRecordFromEntries( logName, - ld, - handleCache, + reader, segment, executorService, new SingleEntryScanContext(entryId), @@ -686,11 +637,10 @@ public class ReadUtils { public void onSuccess(List<LogRecordWithDLSN> resultList) { processSearchResults( logName, - ld, segment, transactionId, executorService, - handleCache, + reader, resultList, nWays, prevFoundRecord, @@ -711,11 +661,10 @@ public class ReadUtils { */ static void processSearchResults( final String logName, - final LedgerDescriptor ld, final LogSegmentMetadata segment, final long transactionId, final ExecutorService executorService, - final LedgerHandleCache handleCache, + final LogSegmentRandomAccessEntryReader reader, final List<LogRecordWithDLSN> searchResults, final int nWays, final Optional<LogRecordWithDLSN> prevFoundRecord, @@ -758,11 +707,10 @@ public class ReadUtils { } getLogRecordNotLessThanTxIdFromEntries( logName, - ld, segment, transactionId, executorService, - handleCache, + reader, nextSearchBatch, nWays, Optional.of(foundRecord),
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/DistributedLogAdmin.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/DistributedLogAdmin.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/DistributedLogAdmin.java index 22b81a1..0a3fdb0 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/DistributedLogAdmin.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/DistributedLogAdmin.java @@ -21,7 +21,6 @@ import com.google.common.base.Preconditions; import com.twitter.distributedlog.BookKeeperClient; import com.twitter.distributedlog.DistributedLogConfiguration; import com.twitter.distributedlog.DistributedLogManager; -import com.twitter.distributedlog.LedgerHandleCache; import com.twitter.distributedlog.LogRecordWithDLSN; import com.twitter.distributedlog.LogSegmentMetadata; import com.twitter.distributedlog.ReadUtils; @@ -30,6 +29,9 @@ import com.twitter.distributedlog.ZooKeeperClientBuilder; import com.twitter.distributedlog.acl.ZKAccessControl; import com.twitter.distributedlog.exceptions.DLIllegalStateException; import com.twitter.distributedlog.impl.federated.FederatedZKLogMetadataStore; +import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryStore; +import com.twitter.distributedlog.injector.AsyncFailureInjector; +import com.twitter.distributedlog.logsegment.LogSegmentEntryStore; import com.twitter.distributedlog.metadata.BKDLConfig; import com.twitter.distributedlog.metadata.DLMetadata; import com.twitter.distributedlog.metadata.DryrunLogSegmentMetadataStoreUpdater; @@ -39,10 +41,12 @@ import com.twitter.distributedlog.thrift.AccessControlEntry; import com.twitter.distributedlog.tools.DistributedLogTool; import com.twitter.distributedlog.util.DLUtils; import com.twitter.distributedlog.util.FutureUtils; +import com.twitter.distributedlog.util.OrderedScheduler; import com.twitter.distributedlog.util.SchedulerUtils; import com.twitter.util.Await; import com.twitter.util.Function; import com.twitter.util.Future; +import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.util.IOUtils; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Options; @@ -50,8 +54,6 @@ import org.apache.commons.cli.ParseException; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction0; -import scala.runtime.BoxedUnit; import java.io.IOException; import java.net.URI; @@ -194,18 +196,18 @@ public class DistributedLogAdmin extends DistributedLogTool { public static void checkAndRepairDLNamespace(final URI uri, final com.twitter.distributedlog.DistributedLogManagerFactory factory, final MetadataUpdater metadataUpdater, - final ExecutorService executorService, + final OrderedScheduler scheduler, final BookKeeperClient bkc, final String digestpw, final boolean verbose, final boolean interactive) throws IOException { - checkAndRepairDLNamespace(uri, factory, metadataUpdater, executorService, bkc, digestpw, verbose, interactive, 1); + checkAndRepairDLNamespace(uri, factory, metadataUpdater, scheduler, bkc, digestpw, verbose, interactive, 1); } public static void checkAndRepairDLNamespace(final URI uri, final com.twitter.distributedlog.DistributedLogManagerFactory factory, final MetadataUpdater metadataUpdater, - final ExecutorService executorService, + final OrderedScheduler scheduler, final BookKeeperClient bkc, final String digestpw, final boolean verbose, @@ -222,7 +224,7 @@ public class DistributedLogAdmin extends DistributedLogTool { return; } Map<String, StreamCandidate> streamCandidates = - checkStreams(factory, streams, executorService, bkc, digestpw, concurrency); + checkStreams(factory, streams, scheduler, bkc, digestpw, concurrency); if (verbose) { System.out.println("+ 0. " + streamCandidates.size() + " corrupted streams found."); } @@ -248,7 +250,7 @@ public class DistributedLogAdmin extends DistributedLogTool { private static Map<String, StreamCandidate> checkStreams( final com.twitter.distributedlog.DistributedLogManagerFactory factory, final Collection<String> streams, - final ExecutorService executorService, + final OrderedScheduler scheduler, final BookKeeperClient bkc, final String digestpw, final int concurrency) throws IOException { @@ -273,7 +275,7 @@ public class DistributedLogAdmin extends DistributedLogTool { StreamCandidate candidate; try { LOG.info("Checking stream {}.", stream); - candidate = checkStream(factory, stream, executorService, bkc, digestpw); + candidate = checkStream(factory, stream, scheduler, bkc, digestpw); LOG.info("Checked stream {} - {}.", stream, candidate); } catch (IOException e) { LOG.error("Error on checking stream {} : ", stream, e); @@ -316,7 +318,7 @@ public class DistributedLogAdmin extends DistributedLogTool { private static StreamCandidate checkStream( final com.twitter.distributedlog.DistributedLogManagerFactory factory, final String streamName, - final ExecutorService executorService, + final OrderedScheduler scheduler, final BookKeeperClient bkc, String digestpw) throws IOException { DistributedLogManager dlm = factory.createDistributedLogManagerWithSharedClients(streamName); @@ -328,7 +330,7 @@ public class DistributedLogAdmin extends DistributedLogTool { List<Future<LogSegmentCandidate>> futures = new ArrayList<Future<LogSegmentCandidate>>(segments.size()); for (LogSegmentMetadata segment : segments) { - futures.add(checkLogSegment(streamName, segment, executorService, bkc, digestpw)); + futures.add(checkLogSegment(streamName, segment, scheduler, bkc, digestpw)); } List<LogSegmentCandidate> segmentCandidates; try { @@ -354,17 +356,19 @@ public class DistributedLogAdmin extends DistributedLogTool { private static Future<LogSegmentCandidate> checkLogSegment( final String streamName, final LogSegmentMetadata metadata, - final ExecutorService executorService, + final OrderedScheduler scheduler, final BookKeeperClient bkc, final String digestpw) { if (metadata.isInProgress()) { return Future.value(null); } - final LedgerHandleCache handleCache = LedgerHandleCache.newBuilder() - .bkc(bkc) - .conf(new DistributedLogConfiguration().setBKDigestPW(digestpw)) - .build(); + final LogSegmentEntryStore entryStore = new BKLogSegmentEntryStore( + new DistributedLogConfiguration().setBKDigestPW(digestpw), + bkc, + scheduler, + NullStatsLogger.INSTANCE, + AsyncFailureInjector.NULL); return ReadUtils.asyncReadLastRecord( streamName, metadata, @@ -374,8 +378,8 @@ public class DistributedLogAdmin extends DistributedLogTool { 4, 16, new AtomicInteger(0), - executorService, - handleCache + scheduler, + entryStore ).map(new Function<LogRecordWithDLSN, LogSegmentCandidate>() { @Override public LogSegmentCandidate apply(LogRecordWithDLSN record) { @@ -388,12 +392,6 @@ public class DistributedLogAdmin extends DistributedLogTool { return null; } } - }).ensure(new AbstractFunction0<BoxedUnit>() { - @Override - public BoxedUnit apply() { - handleCache.clear(); - return BoxedUnit.UNIT; - } }); } @@ -736,10 +734,14 @@ public class DistributedLogAdmin extends DistributedLogTool { getLogSegmentMetadataStore()) : LogSegmentMetadataStoreUpdater.createMetadataUpdater(getConf(), getLogSegmentMetadataStore()); + OrderedScheduler scheduler = OrderedScheduler.newBuilder() + .name("dlck-scheduler") + .corePoolSize(Runtime.getRuntime().availableProcessors()) + .build(); ExecutorService executorService = Executors.newCachedThreadPool(); BookKeeperClient bkc = getBookKeeperClient(); try { - checkAndRepairDLNamespace(getUri(), getFactory(), metadataUpdater, executorService, + checkAndRepairDLNamespace(getUri(), getFactory(), metadataUpdater, scheduler, bkc, getConf().getBKDigestPW(), verbose, !getForce(), concurrency); } finally { SchedulerUtils.shutdownScheduler(executorService, 5, TimeUnit.MINUTES); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java index 4650949..f7f4acf 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java @@ -17,6 +17,7 @@ */ package com.twitter.distributedlog.impl.logsegment; +import com.twitter.distributedlog.BookKeeperClient; import com.twitter.distributedlog.DistributedLogConfiguration; import com.twitter.distributedlog.LogSegmentMetadata; import com.twitter.distributedlog.exceptions.BKTransmitException; @@ -24,6 +25,7 @@ import com.twitter.distributedlog.injector.AsyncFailureInjector; import com.twitter.distributedlog.logsegment.LogSegmentEntryReader; import com.twitter.distributedlog.logsegment.LogSegmentEntryStore; import com.twitter.distributedlog.logsegment.LogSegmentEntryWriter; +import com.twitter.distributedlog.logsegment.LogSegmentRandomAccessEntryReader; import com.twitter.distributedlog.util.FutureUtils; import com.twitter.distributedlog.util.OrderedScheduler; import com.twitter.util.Future; @@ -33,13 +35,22 @@ import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.stats.StatsLogger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; import static com.google.common.base.Charsets.UTF_8; /** * BookKeeper Based Entry Store */ -public class BKLogSegmentEntryStore implements LogSegmentEntryStore, AsyncCallback.OpenCallback { +public class BKLogSegmentEntryStore implements + LogSegmentEntryStore, + AsyncCallback.OpenCallback, + AsyncCallback.DeleteCallback { + + private static final Logger logger = LoggerFactory.getLogger(BKLogSegmentEntryReader.class); private static class OpenReaderRequest { @@ -56,20 +67,32 @@ public class BKLogSegmentEntryStore implements LogSegmentEntryStore, AsyncCallba } + private static class DeleteLogSegmentRequest { + + private final LogSegmentMetadata segment; + private final Promise<LogSegmentMetadata> deletePromise; + + DeleteLogSegmentRequest(LogSegmentMetadata segment) { + this.segment = segment; + this.deletePromise = new Promise<LogSegmentMetadata>(); + } + + } + private final byte[] passwd; - private final BookKeeper bk; + private final BookKeeperClient bkc; private final OrderedScheduler scheduler; private final DistributedLogConfiguration conf; private final StatsLogger statsLogger; private final AsyncFailureInjector failureInjector; public BKLogSegmentEntryStore(DistributedLogConfiguration conf, - BookKeeper bk, + BookKeeperClient bkc, OrderedScheduler scheduler, StatsLogger statsLogger, AsyncFailureInjector failureInjector) { this.conf = conf; - this.bk = bk; + this.bkc = bkc; this.passwd = conf.getBKDigestPW().getBytes(UTF_8); this.scheduler = scheduler; this.statsLogger = statsLogger; @@ -77,6 +100,36 @@ public class BKLogSegmentEntryStore implements LogSegmentEntryStore, AsyncCallba } @Override + public Future<LogSegmentMetadata> deleteLogSegment(LogSegmentMetadata segment) { + DeleteLogSegmentRequest request = new DeleteLogSegmentRequest(segment); + BookKeeper bk; + try { + bk = this.bkc.get(); + } catch (IOException e) { + return Future.exception(e); + } + bk.asyncDeleteLedger(segment.getLogSegmentId(), this, request); + return request.deletePromise; + } + + @Override + public void deleteComplete(int rc, Object ctx) { + DeleteLogSegmentRequest deleteRequest = (DeleteLogSegmentRequest) ctx; + if (BKException.Code.NoSuchLedgerExistsException == rc) { + logger.warn("No ledger {} found to delete for {}.", + deleteRequest.segment.getLogSegmentId(), deleteRequest.segment); + } else if (BKException.Code.OK != rc) { + logger.error("Couldn't delete ledger {} from bookkeeper for {} : {}", + new Object[]{ deleteRequest.segment.getLogSegmentId(), deleteRequest.segment, + BKException.getMessage(rc) }); + FutureUtils.setException(deleteRequest.deletePromise, + new BKTransmitException("Couldn't delete log segment " + deleteRequest.segment, rc)); + return; + } + FutureUtils.setValue(deleteRequest.deletePromise, deleteRequest.segment); + } + + @Override public Future<LogSegmentEntryWriter> openWriter(LogSegmentMetadata segment) { throw new UnsupportedOperationException("Not supported yet"); } @@ -84,6 +137,12 @@ public class BKLogSegmentEntryStore implements LogSegmentEntryStore, AsyncCallba @Override public Future<LogSegmentEntryReader> openReader(LogSegmentMetadata segment, long startEntryId) { + BookKeeper bk; + try { + bk = this.bkc.get(); + } catch (IOException e) { + return Future.exception(e); + } OpenReaderRequest request = new OpenReaderRequest(segment, startEntryId); if (segment.isInProgress()) { bk.asyncOpenLedgerNoRecovery( @@ -113,15 +172,64 @@ public class BKLogSegmentEntryStore implements LogSegmentEntryStore, AsyncCallba return; } // successfully open a ledger - LogSegmentEntryReader reader = new BKLogSegmentEntryReader( - request.segment, - lh, - request.startEntryId, - bk, - scheduler, - conf, - statsLogger, - failureInjector); - FutureUtils.setValue(request.openPromise, reader); + try { + LogSegmentEntryReader reader = new BKLogSegmentEntryReader( + request.segment, + lh, + request.startEntryId, + bkc.get(), + scheduler, + conf, + statsLogger, + failureInjector); + FutureUtils.setValue(request.openPromise, reader); + } catch (IOException e) { + FutureUtils.setException(request.openPromise, e); + } + + } + + @Override + public Future<LogSegmentRandomAccessEntryReader> openRandomAccessReader(final LogSegmentMetadata segment, + final boolean fence) { + final BookKeeper bk; + try { + bk = this.bkc.get(); + } catch (IOException e) { + return Future.exception(e); + } + final Promise<LogSegmentRandomAccessEntryReader> openPromise = new Promise<LogSegmentRandomAccessEntryReader>(); + AsyncCallback.OpenCallback openCallback = new AsyncCallback.OpenCallback() { + @Override + public void openComplete(int rc, LedgerHandle lh, Object ctx) { + if (BKException.Code.OK != rc) { + FutureUtils.setException( + openPromise, + new BKTransmitException("Failed to open ledger handle for log segment " + segment, rc)); + return; + } + LogSegmentRandomAccessEntryReader reader = new BKLogSegmentRandomAccessEntryReader( + segment, + lh, + conf); + FutureUtils.setValue(openPromise, reader); + } + }; + if (segment.isInProgress() && !fence) { + bk.asyncOpenLedgerNoRecovery( + segment.getLogSegmentId(), + BookKeeper.DigestType.CRC32, + passwd, + this, + openCallback); + } else { + bk.asyncOpenLedger( + segment.getLogSegmentId(), + BookKeeper.DigestType.CRC32, + passwd, + this, + openCallback); + } + return openPromise; } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java new file mode 100644 index 0000000..9cec80c --- /dev/null +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.impl.logsegment; + +import com.google.common.collect.Lists; +import com.twitter.distributedlog.DistributedLogConfiguration; +import com.twitter.distributedlog.Entry; +import com.twitter.distributedlog.LogSegmentMetadata; +import com.twitter.distributedlog.exceptions.BKTransmitException; +import com.twitter.distributedlog.logsegment.LogSegmentRandomAccessEntryReader; +import com.twitter.distributedlog.util.FutureUtils; +import com.twitter.util.Future; +import com.twitter.util.Promise; +import org.apache.bookkeeper.client.AsyncCallback.ReadCallback; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.LedgerEntry; +import org.apache.bookkeeper.client.LedgerHandle; + +import java.io.IOException; +import java.util.Enumeration; +import java.util.List; + +/** + * BookKeeper ledger based random access entry reader. + */ +class BKLogSegmentRandomAccessEntryReader implements + LogSegmentRandomAccessEntryReader, + ReadCallback { + + private final long lssn; + private final long startSequenceId; + private final boolean envelopeEntries; + private final boolean deserializeRecordSet; + // state + private final LogSegmentMetadata metadata; + private final LedgerHandle lh; + private Promise<Void> closePromise = null; + + BKLogSegmentRandomAccessEntryReader(LogSegmentMetadata metadata, + LedgerHandle lh, + DistributedLogConfiguration conf) { + this.metadata = metadata; + this.lssn = metadata.getLogSegmentSequenceNumber(); + this.startSequenceId = metadata.getStartSequenceId(); + this.envelopeEntries = metadata.getEnvelopeEntries(); + this.deserializeRecordSet = conf.getDeserializeRecordSetOnReads(); + this.lh = lh; + } + + @Override + public long getLastAddConfirmed() { + return lh.getLastAddConfirmed(); + } + + @Override + public Future<List<Entry.Reader>> readEntries(long startEntryId, long endEntryId) { + Promise<List<Entry.Reader>> promise = new Promise<List<Entry.Reader>>(); + lh.asyncReadEntries(startEntryId, endEntryId, this, promise); + return promise; + } + + Entry.Reader processReadEntry(LedgerEntry entry) throws IOException { + return Entry.newBuilder() + .setLogSegmentInfo(lssn, startSequenceId) + .setEntryId(entry.getEntryId()) + .setEnvelopeEntry(envelopeEntries) + .deserializeRecordSet(deserializeRecordSet) + .setInputStream(entry.getEntryInputStream()) + .buildReader(); + } + + @Override + public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> entries, Object ctx) { + Promise<List<Entry.Reader>> promise = (Promise<List<Entry.Reader>>) ctx; + if (BKException.Code.OK == rc) { + List<Entry.Reader> entryList = Lists.newArrayList(); + while (entries.hasMoreElements()) { + try { + entryList.add(processReadEntry(entries.nextElement())); + } catch (IOException ioe) { + FutureUtils.setException(promise, ioe); + return; + } + } + FutureUtils.setValue(promise, entryList); + } else { + FutureUtils.setException(promise, + new BKTransmitException("Failed to read entries :", rc)); + } + } + + @Override + public Future<Void> asyncClose() { + final Promise<Void> closeFuture; + synchronized (this) { + if (null != closePromise) { + return closePromise; + } + closeFuture = closePromise = new Promise<Void>(); + } + BKUtils.closeLedgers(lh).proxyTo(closeFuture); + return closeFuture; + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java index d8611f9..850f9c8 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java @@ -28,6 +28,14 @@ import com.twitter.util.Future; public interface LogSegmentEntryStore { /** + * Delete the actual log segment from the entry store. + * + * @param segment log segment metadata + * @return future represent the delete result + */ + Future<LogSegmentMetadata> deleteLogSegment(LogSegmentMetadata segment); + + /** * Open the writer for writing data to the log <i>segment</i>. * * @param segment the log <i>segment</i> to write data to @@ -45,4 +53,13 @@ public interface LogSegmentEntryStore { Future<LogSegmentEntryReader> openReader(LogSegmentMetadata segment, long startEntryId); + /** + * Open the reader for reading entries from a random access log <i>segment</i>. + * + * @param segment the log <i>segment</i> to read entries from + * @param fence the flag to fence log segment + * @return future represent the opened random access reader + */ + Future<LogSegmentRandomAccessEntryReader> openRandomAccessReader(LogSegmentMetadata segment, + boolean fence); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentRandomAccessEntryReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentRandomAccessEntryReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentRandomAccessEntryReader.java new file mode 100644 index 0000000..70472ca --- /dev/null +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentRandomAccessEntryReader.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.logsegment; + +import com.twitter.distributedlog.Entry; +import com.twitter.distributedlog.io.AsyncCloseable; +import com.twitter.util.Future; + +import java.util.List; + +/** + * An interface class to read entries {@link com.twitter.distributedlog.Entry} + * from a random access log segment. + */ +public interface LogSegmentRandomAccessEntryReader extends AsyncCloseable { + + /** + * Read entries [startEntryId, endEntryId] from a random access log segment. + * + * @param startEntryId start entry id + * @param endEntryId end entry id + * @return A promise that when satisfied will contain a list of entries of [startEntryId, endEntryId]. + */ + Future<List<Entry.Reader>> readEntries(long startEntryId, long endEntryId); + + /** + * Return the last add confirmed entry id (LAC). + * + * @return the last add confirmed entry id. + */ + long getLastAddConfirmed(); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadPhase.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadPhase.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadPhase.java deleted file mode 100644 index 1f15221..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadPhase.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.readahead; - -/** - * Enum code represents readahead phases - */ -public enum ReadAheadPhase { - ERROR(-5), - TRUNCATED(-4), - INTERRUPTED(-3), - STOPPED(-2), - EXCEPTION_HANDLING(-1), - SCHEDULE_READAHEAD(0), - GET_LEDGERS(1), - OPEN_LEDGER(2), - CLOSE_LEDGER(3), - READ_LAST_CONFIRMED(4), - READ_ENTRIES(5); - - int code; - - ReadAheadPhase(int code) { - this.code = code; - } - - int getCode() { - return this.code; - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadTracker.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadTracker.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadTracker.java deleted file mode 100644 index a58218b..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadTracker.java +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.readahead; - -import com.twitter.distributedlog.ReadAheadCache; -import org.apache.bookkeeper.stats.Gauge; -import org.apache.bookkeeper.stats.StatsLogger; - -import java.util.concurrent.atomic.AtomicLong; - -/** - * ReadAheadTracker is tracking the progress of readahead worker. so we could use it to investigate where - * the readahead worker is. - */ -public class ReadAheadTracker { - // ticks is used to differentiate that the worker enter same phase in different time. - final AtomicLong ticks = new AtomicLong(0); - // which phase that the worker is in. - ReadAheadPhase phase; - private final StatsLogger statsLogger; - // Gauges and their labels - private static final String phaseGaugeLabel = "phase"; - private final Gauge<Number> phaseGauge; - private static final String ticksGaugeLabel = "ticks"; - private final Gauge<Number> ticksGauge; - private static final String cachEntriesGaugeLabel = "cache_entries"; - private final Gauge<Number> cacheEntriesGauge; - - ReadAheadTracker(String streamName, - final ReadAheadCache cache, - ReadAheadPhase initialPhase, - StatsLogger statsLogger) { - this.statsLogger = statsLogger; - this.phase = initialPhase; - phaseGauge = new Gauge<Number>() { - @Override - public Number getDefaultValue() { - return ReadAheadPhase.SCHEDULE_READAHEAD.getCode(); - } - - @Override - public Number getSample() { - return phase.getCode(); - } - }; - this.statsLogger.registerGauge(phaseGaugeLabel, phaseGauge); - - ticksGauge = new Gauge<Number>() { - @Override - public Number getDefaultValue() { - return 0; - } - - @Override - public Number getSample() { - return ticks.get(); - } - }; - this.statsLogger.registerGauge(ticksGaugeLabel, ticksGauge); - - cacheEntriesGauge = new Gauge<Number>() { - @Override - public Number getDefaultValue() { - return 0; - } - - @Override - public Number getSample() { - return cache.getNumCachedEntries(); - } - }; - this.statsLogger.registerGauge(cachEntriesGaugeLabel, cacheEntriesGauge); - } - - ReadAheadPhase getPhase() { - return this.phase; - } - - public void enterPhase(ReadAheadPhase readAheadPhase) { - this.ticks.incrementAndGet(); - this.phase = readAheadPhase; - } - - public void unregisterGauge() { - this.statsLogger.unregisterGauge(phaseGaugeLabel, phaseGauge); - this.statsLogger.unregisterGauge(ticksGaugeLabel, ticksGauge); - this.statsLogger.unregisterGauge(cachEntriesGaugeLabel, cacheEntriesGauge); - } -}