http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java deleted file mode 100644 index 8529281..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java +++ /dev/null @@ -1,1503 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.readahead; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Stopwatch; -import com.twitter.distributedlog.exceptions.AlreadyTruncatedTransactionException; -import com.twitter.distributedlog.AsyncNotification; -import com.twitter.distributedlog.BKLogHandler; -import com.twitter.distributedlog.DLSN; -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.LedgerDescriptor; -import com.twitter.distributedlog.LedgerHandleCache; -import com.twitter.distributedlog.LedgerReadPosition; -import com.twitter.distributedlog.exceptions.LogNotFoundException; -import com.twitter.distributedlog.exceptions.LogReadException; -import com.twitter.distributedlog.LogSegmentMetadata; -import com.twitter.distributedlog.ReadAheadCache; -import com.twitter.distributedlog.callback.LogSegmentListener; -import com.twitter.distributedlog.callback.ReadAheadCallback; -import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; -import com.twitter.distributedlog.exceptions.DLInterruptedException; -import com.twitter.distributedlog.exceptions.UnexpectedException; -import com.twitter.distributedlog.metadata.LogMetadataForReader; -import com.twitter.distributedlog.injector.AsyncFailureInjector; -import com.twitter.distributedlog.io.AsyncCloseable; -import com.twitter.distributedlog.logsegment.LogSegmentFilter; -import com.twitter.distributedlog.stats.ReadAheadExceptionsLogger; -import com.twitter.distributedlog.util.FutureUtils; -import com.twitter.distributedlog.util.OrderedScheduler; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; -import org.apache.bookkeeper.client.AsyncCallback; -import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.client.LedgerEntry; -import org.apache.bookkeeper.client.LedgerHandle; -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks; -import org.apache.bookkeeper.stats.AlertStatsLogger; -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.bookkeeper.util.MathUtils; -import org.apache.bookkeeper.versioning.Versioned; -import org.apache.commons.lang3.tuple.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.Function1; -import scala.runtime.AbstractFunction1; -import scala.runtime.BoxedUnit; - -import java.util.Enumeration; -import java.util.List; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * ReadAhead Worker process readahead in asynchronous way. The whole readahead process are chained into - * different phases: - * - * <p> - * ScheduleReadAheadPhase: Schedule the readahead request based on previous state (e.g. whether to backoff). - * After the readahead request was scheduled, the worker enters ReadAhead phase. - * </p> - * <p> - * ReadAhead Phase: This phase is divided into several sub-phases. All the sub-phases are chained into the - * execution flow. If errors happened during execution, the worker enters Exceptions Handling Phase. - * <br> - * CheckInProgressChangedPhase: check whether there is in progress ledgers changed and updated the metadata. - * <br> - * OpenLedgerPhase: open ledgers if necessary for following read requests. - * <br> - * ReadEntriesPhase: read entries from bookkeeper and fill the readahead cache. - * <br> - * After that, the worker goes back to Schedule Phase to schedule next readahead request. - * </p> - * <p> - * Exceptions Handling Phase: Handle all the exceptions and properly schedule next readahead request. - * </p> - */ -public class ReadAheadWorker implements ReadAheadCallback, Runnable, AsyncCloseable, LogSegmentListener { - - private static final Logger LOG = LoggerFactory.getLogger(ReadAheadWorker.class); - - private static final int BKC_ZK_EXCEPTION_THRESHOLD_IN_SECONDS = 30; - private static final int BKC_UNEXPECTED_EXCEPTION_THRESHOLD = 3; - - // Stream information - private final String fullyQualifiedName; - private final DistributedLogConfiguration conf; - private final DynamicDistributedLogConfiguration dynConf; - private final LogMetadataForReader logMetadata; - private final BKLogHandler bkLedgerManager; - private final boolean isHandleForReading; - // Notification to notify readahead status - protected final AsyncNotification notification; - - // resources - protected final OrderedScheduler scheduler; - private final LedgerHandleCache handleCache; - private final ReadAheadCache readAheadCache; - - // ReadAhead Status - volatile boolean running = true; - Promise<Void> stopPromise = null; - private volatile boolean isCatchingUp = true; - private volatile boolean logDeleted = false; - private volatile boolean readAheadError = false; - private volatile boolean readAheadInterrupted = false; - private volatile boolean readingFromTruncated = false; - - // Exceptions Handling - volatile boolean encounteredException = false; - private final AtomicInteger bkcZkExceptions = new AtomicInteger(0); - private final AtomicInteger bkcUnExpectedExceptions = new AtomicInteger(0); - private final int noLedgerExceptionOnReadLACThreshold; - private final AtomicInteger bkcNoLedgerExceptionsOnReadLAC = new AtomicInteger(0); - - // Read Ahead Positions - private final LedgerReadPosition startReadPosition; - protected LedgerReadPosition nextReadAheadPosition; - - // - // LogSegments & Metadata Notification - // - - // variables related to zookeeper watcher notification to interrupt long poll waits - final Object notificationLock = new Object(); - AsyncNotification metadataNotification = null; - volatile long metadataNotificationTimeMillis = -1L; - - // variables related to log segments - private volatile boolean reInitializeMetadata = true; - private volatile boolean forceReadLogSegments = false; - volatile boolean inProgressChanged = false; - private LogSegmentMetadata currentMetadata = null; - private int currentMetadataIndex; - protected LedgerDescriptor currentLH; - private volatile List<LogSegmentMetadata> logSegmentListNotified; - private volatile List<LogSegmentMetadata> logSegmentList; - - // - // ReadAhead Phases - // - - final Phase schedulePhase = new ScheduleReadAheadPhase(); - final Phase exceptionHandler = new ExceptionHandlePhase(schedulePhase); - final Phase readAheadPhase = - new StoppablePhase( - new CheckInProgressChangedPhase( - new OpenLedgerPhase( - new ReadEntriesPhase(schedulePhase)))); - - // - // Stats, Tracing and Failure Injection - // - - // failure injector - private final AsyncFailureInjector failureInjector; - // trace - protected final long metadataLatencyWarnThresholdMillis; - final ReadAheadTracker tracker; - final Stopwatch resumeStopWatch; - final Stopwatch LACNotAdvancedStopWatch = Stopwatch.createUnstarted(); - // Misc - private final boolean readAheadSkipBrokenEntries; - // Stats - private final AlertStatsLogger alertStatsLogger; - private final StatsLogger readAheadPerStreamStatsLogger; - private final Counter readAheadWorkerWaits; - private final Counter readAheadEntryPiggyBackHits; - private final Counter readAheadEntryPiggyBackMisses; - private final Counter readAheadReadLACAndEntryCounter; - private final Counter readAheadCacheFullCounter; - private final Counter readAheadSkippedBrokenEntries; - private final Counter idleReaderWarn; - private final OpStatsLogger readAheadReadEntriesStat; - private final OpStatsLogger readAheadCacheResumeStat; - private final OpStatsLogger readAheadLacLagStats; - private final OpStatsLogger longPollInterruptionStat; - private final OpStatsLogger metadataReinitializationStat; - private final OpStatsLogger notificationExecutionStat; - private final ReadAheadExceptionsLogger readAheadExceptionsLogger; - - public ReadAheadWorker(DistributedLogConfiguration conf, - DynamicDistributedLogConfiguration dynConf, - LogMetadataForReader logMetadata, - BKLogHandler ledgerManager, - OrderedScheduler scheduler, - LedgerHandleCache handleCache, - LedgerReadPosition startPosition, - ReadAheadCache readAheadCache, - boolean isHandleForReading, - ReadAheadExceptionsLogger readAheadExceptionsLogger, - StatsLogger handlerStatsLogger, - StatsLogger readAheadPerStreamStatsLogger, - AlertStatsLogger alertStatsLogger, - AsyncFailureInjector failureInjector, - AsyncNotification notification) { - // Log information - this.fullyQualifiedName = logMetadata.getFullyQualifiedName(); - this.conf = conf; - this.dynConf = dynConf; - this.logMetadata = logMetadata; - this.bkLedgerManager = ledgerManager; - this.isHandleForReading = isHandleForReading; - this.notification = notification; - // Resources - this.scheduler = scheduler; - this.handleCache = handleCache; - this.readAheadCache = readAheadCache; - // Readahead status - this.startReadPosition = new LedgerReadPosition(startPosition); - this.nextReadAheadPosition = new LedgerReadPosition(startPosition); - // LogSegments - - // Failure Detection - this.failureInjector = failureInjector; - // Tracing - this.metadataLatencyWarnThresholdMillis = conf.getMetadataLatencyWarnThresholdMillis(); - this.noLedgerExceptionOnReadLACThreshold = - conf.getReadAheadNoSuchLedgerExceptionOnReadLACErrorThresholdMillis() / conf.getReadAheadWaitTime(); - this.tracker = new ReadAheadTracker(logMetadata.getLogName(), readAheadCache, - ReadAheadPhase.SCHEDULE_READAHEAD, readAheadPerStreamStatsLogger); - this.resumeStopWatch = Stopwatch.createUnstarted(); - // Misc - this.readAheadSkipBrokenEntries = conf.getReadAheadSkipBrokenEntries(); - // Stats - this.alertStatsLogger = alertStatsLogger; - this.readAheadPerStreamStatsLogger = readAheadPerStreamStatsLogger; - StatsLogger readAheadStatsLogger = handlerStatsLogger.scope("readahead_worker"); - readAheadWorkerWaits = readAheadStatsLogger.getCounter("wait"); - readAheadEntryPiggyBackHits = readAheadStatsLogger.getCounter("entry_piggy_back_hits"); - readAheadEntryPiggyBackMisses = readAheadStatsLogger.getCounter("entry_piggy_back_misses"); - readAheadReadEntriesStat = readAheadStatsLogger.getOpStatsLogger("read_entries"); - readAheadReadLACAndEntryCounter = readAheadStatsLogger.getCounter("read_lac_and_entry_counter"); - readAheadCacheFullCounter = readAheadStatsLogger.getCounter("cache_full"); - readAheadSkippedBrokenEntries = readAheadStatsLogger.getCounter("skipped_broken_entries"); - readAheadCacheResumeStat = readAheadStatsLogger.getOpStatsLogger("resume"); - readAheadLacLagStats = readAheadStatsLogger.getOpStatsLogger("read_lac_lag"); - longPollInterruptionStat = readAheadStatsLogger.getOpStatsLogger("long_poll_interruption"); - notificationExecutionStat = readAheadStatsLogger.getOpStatsLogger("notification_execution"); - metadataReinitializationStat = readAheadStatsLogger.getOpStatsLogger("metadata_reinitialization"); - idleReaderWarn = readAheadStatsLogger.getCounter("idle_reader_warn"); - this.readAheadExceptionsLogger = readAheadExceptionsLogger; - } - - @VisibleForTesting - public LedgerReadPosition getNextReadAheadPosition() { - return nextReadAheadPosition; - } - - public LedgerDescriptor getCurrentLedgerDescriptor() { - return currentLH; - } - - // - // ReadAhead Status - // - - void setReadAheadError(ReadAheadTracker tracker, Throwable cause) { - LOG.error("Read Ahead for {} is set to error.", logMetadata.getFullyQualifiedName()); - readAheadError = true; - tracker.enterPhase(ReadAheadPhase.ERROR); - if (null != notification) { - notification.notifyOnError(cause); - } - if (null != stopPromise) { - FutureUtils.setValue(stopPromise, null); - } - } - - void setReadAheadInterrupted(ReadAheadTracker tracker) { - readAheadInterrupted = true; - tracker.enterPhase(ReadAheadPhase.INTERRUPTED); - if (null != notification) { - notification.notifyOnError(new DLInterruptedException("ReadAhead worker for " - + bkLedgerManager.getFullyQualifiedName() + " is interrupted.")); - } - if (null != stopPromise) { - FutureUtils.setValue(stopPromise, null); - } - } - - void setReadingFromTruncated(ReadAheadTracker tracker) { - readingFromTruncated = true; - tracker.enterPhase(ReadAheadPhase.TRUNCATED); - if (null != notification) { - notification.notifyOnError( - new AlreadyTruncatedTransactionException(logMetadata.getFullyQualifiedName() - + ": Trying to position read ahead to a segment that is marked truncated")); - } - if (null != stopPromise) { - FutureUtils.setValue(stopPromise, null); - } - } - - private void setReadAheadStopped() { - tracker.enterPhase(ReadAheadPhase.STOPPED); - if (null != stopPromise) { - FutureUtils.setValue(stopPromise, null); - } - LOG.info("Stopped ReadAheadWorker for {}", fullyQualifiedName); - } - - public void checkClosedOrInError() - throws LogNotFoundException, LogReadException, DLInterruptedException, - AlreadyTruncatedTransactionException { - if (logDeleted) { - throw new LogNotFoundException(logMetadata.getFullyQualifiedName() + " is already deleted."); - } else if (readingFromTruncated) { - throw new AlreadyTruncatedTransactionException( - String.format("%s: Trying to position read ahead a segment that is marked truncated", - logMetadata.getFullyQualifiedName())); - } else if (readAheadInterrupted) { - throw new DLInterruptedException(String.format("%s: ReadAhead Thread was interrupted", - logMetadata.getFullyQualifiedName())); - } else if (readAheadError) { - throw new LogReadException(String.format("%s: ReadAhead Thread encountered exceptions", - logMetadata.getFullyQualifiedName())); - } - } - - public boolean isCaughtUp() { - return !isCatchingUp; - } - - public void start(List<LogSegmentMetadata> segmentList) { - LOG.debug("Starting ReadAhead Worker for {} : segments = {}", - fullyQualifiedName, segmentList); - running = true; - logSegmentListNotified = segmentList; - schedulePhase.process(BKException.Code.OK); - } - - @Override - public Future<Void> asyncClose() { - LOG.info("Stopping Readahead worker for {}", fullyQualifiedName); - running = false; - // Unregister associated gauages to prevent GC spiral - this.tracker.unregisterGauge(); - - // Aside from unfortunate naming of variables, this allows - // the currently active long poll to be interrupted and completed - AsyncNotification notification; - synchronized (notificationLock) { - notification = metadataNotification; - metadataNotification = null; - } - if (null != notification) { - notification.notifyOnOperationComplete(); - } - if (null == stopPromise) { - return Future.Void(); - } - return FutureUtils.ignore(FutureUtils.within( - stopPromise, - 2 * conf.getReadAheadWaitTime(), - TimeUnit.MILLISECONDS, - new TimeoutException("Timeout on waiting for ReadAhead worker to stop " + fullyQualifiedName), - scheduler, - fullyQualifiedName)); - } - - @Override - public String toString() { - return "Running:" + running + - ", NextReadAheadPosition:" + nextReadAheadPosition + - ", BKZKExceptions:" + bkcZkExceptions.get() + - ", BKUnexpectedExceptions:" + bkcUnExpectedExceptions.get() + - ", EncounteredException:" + encounteredException + - ", readAheadError:" + readAheadError + - ", readAheadInterrupted" + readAheadInterrupted + - ", CurrentMetadata:" + ((null != currentMetadata) ? currentMetadata : "NONE") + - ", FailureInjector:" + failureInjector; - } - - @Override - public void resumeReadAhead() { - try { - long cacheResumeLatency = resumeStopWatch.stop().elapsed(TimeUnit.MICROSECONDS); - readAheadCacheResumeStat.registerSuccessfulEvent(cacheResumeLatency); - } catch (IllegalStateException ise) { - LOG.error("Encountered illegal state when stopping resume stop watch for {} : ", - logMetadata.getFullyQualifiedName(), ise); - } - submit(this); - } - - Runnable addRTEHandler(final Runnable runnable) { - return new Runnable() { - @Override - public void run() { - try { - runnable.run(); - } catch (RuntimeException rte) { - LOG.error("ReadAhead on stream {} encountered runtime exception", - logMetadata.getFullyQualifiedName(), rte); - setReadAheadError(tracker, rte); - throw rte; - } - } - }; - } - - <T> Function1<T, BoxedUnit> submit(final Function1<T, BoxedUnit> function) { - return new AbstractFunction1<T, BoxedUnit>() { - @Override - public BoxedUnit apply(final T input) { - submit(new Runnable() { - @Override - public void run() { - function.apply(input); - } - }); - return BoxedUnit.UNIT; - } - }; - } - - void submit(Runnable runnable) { - if (failureInjector.shouldInjectStops()) { - LOG.warn("Error injected: read ahead for stream {} is going to stall.", - logMetadata.getFullyQualifiedName()); - return; - } - - if (failureInjector.shouldInjectDelays()) { - int delayMs = failureInjector.getInjectedDelayMs(); - schedule(runnable, delayMs); - return; - } - - try { - scheduler.submit(addRTEHandler(runnable)); - } catch (RejectedExecutionException ree) { - setReadAheadError(tracker, ree); - } - } - - private void schedule(Runnable runnable, long timeInMillis) { - try { - InterruptibleScheduledRunnable task = new InterruptibleScheduledRunnable(runnable); - boolean executeImmediately = setMetadataNotification(task); - if (executeImmediately) { - scheduler.submit(addRTEHandler(task)); - return; - } - scheduler.schedule(addRTEHandler(task), timeInMillis, TimeUnit.MILLISECONDS); - readAheadWorkerWaits.inc(); - } catch (RejectedExecutionException ree) { - setReadAheadError(tracker, ree); - } - } - - private void handleException(ReadAheadPhase phase, int returnCode) { - readAheadExceptionsLogger.getBKExceptionStatsLogger(phase.name()).getExceptionCounter(returnCode).inc(); - exceptionHandler.process(returnCode); - } - - private boolean closeCurrentLedgerHandle() { - if (currentLH == null) { - return true; - } - boolean retVal = false; - LedgerDescriptor ld = currentLH; - try { - handleCache.closeLedger(ld); - currentLH = null; - retVal = true; - } catch (BKException bke) { - LOG.debug("BK Exception during closing {} : ", ld, bke); - handleException(ReadAheadPhase.CLOSE_LEDGER, bke.getCode()); - } - - return retVal; - } - - abstract class Phase { - - final Phase next; - - Phase(Phase next) { - this.next = next; - } - - abstract void process(int rc); - } - - /** - * Schedule next readahead request. If we need to backoff, schedule in a backoff delay. - */ - final class ScheduleReadAheadPhase extends Phase { - - ScheduleReadAheadPhase() { - super(null); - } - - @Override - void process(int rc) { - if (!running) { - setReadAheadStopped(); - return; - } - tracker.enterPhase(ReadAheadPhase.SCHEDULE_READAHEAD); - - boolean injectErrors = failureInjector.shouldInjectErrors(); - if (encounteredException || injectErrors) { - int zkErrorThreshold = BKC_ZK_EXCEPTION_THRESHOLD_IN_SECONDS * 1000 * 4 / conf.getReadAheadWaitTime(); - - if ((bkcZkExceptions.get() > zkErrorThreshold) || injectErrors) { - LOG.error("{} : BookKeeper Client used by the ReadAhead Thread has encountered {} zookeeper exceptions : simulate = {}", - new Object[] { fullyQualifiedName, bkcZkExceptions.get(), injectErrors }); - running = false; - setReadAheadError(tracker, new LogReadException( - "Encountered too many zookeeper issues on read ahead for " + bkLedgerManager.getFullyQualifiedName())); - } else if (bkcUnExpectedExceptions.get() > BKC_UNEXPECTED_EXCEPTION_THRESHOLD) { - LOG.error("{} : ReadAhead Thread has encountered {} unexpected BK exceptions.", - fullyQualifiedName, bkcUnExpectedExceptions.get()); - running = false; - setReadAheadError(tracker, new LogReadException( - "Encountered too many unexpected bookkeeper issues on read ahead for " + bkLedgerManager.getFullyQualifiedName())); - } else { - // We must always reinitialize metadata if the last attempt to read failed. - reInitializeMetadata = true; - encounteredException = false; - // Backoff before resuming - if (LOG.isTraceEnabled()) { - LOG.trace("Scheduling read ahead for {} after {} ms.", fullyQualifiedName, conf.getReadAheadWaitTime() / 4); - } - schedule(ReadAheadWorker.this, conf.getReadAheadWaitTime() / 4); - } - } else { - if (LOG.isTraceEnabled()) { - LOG.trace("Scheduling read ahead for {} now.", fullyQualifiedName); - } - submit(ReadAheadWorker.this); - } - } - - } - - /** - * Phase on handling exceptions. - */ - final class ExceptionHandlePhase extends Phase { - - ExceptionHandlePhase(Phase next) { - super(next); - } - - @Override - void process(int rc) { - tracker.enterPhase(ReadAheadPhase.EXCEPTION_HANDLING); - - if (BKException.Code.InterruptedException == rc) { - LOG.trace("ReadAhead Worker for {} is interrupted.", fullyQualifiedName); - running = false; - setReadAheadInterrupted(tracker); - return; - } else if (BKException.Code.ZKException == rc) { - encounteredException = true; - int numExceptions = bkcZkExceptions.incrementAndGet(); - LOG.debug("ReadAhead Worker for {} encountered zookeeper exception : total exceptions are {}.", - fullyQualifiedName, numExceptions); - } else if (BKException.Code.OK != rc) { - encounteredException = true; - switch(rc) { - case BKException.Code.NoSuchEntryException: - case BKException.Code.LedgerRecoveryException: - case BKException.Code.NoSuchLedgerExistsException: - break; - default: - bkcUnExpectedExceptions.incrementAndGet(); - } - LOG.info("ReadAhead Worker for {} encountered exception : {}", - fullyQualifiedName, BKException.create(rc)); - } - // schedule next read ahead - next.process(BKException.Code.OK); - } - } - - /** - * A phase that could be stopped by a stopPromise - */ - final class StoppablePhase extends Phase { - - StoppablePhase(Phase next) { - super(next); - } - - @Override - void process(int rc) { - if (!running) { - setReadAheadStopped(); - return; - } - - if (null == stopPromise) { - stopPromise = new Promise<Void>(); - } - - // proceed the readahead request - next.process(BKException.Code.OK); - } - } - - /** - * Phase on checking in progress changed. - */ - final class CheckInProgressChangedPhase extends Phase - implements FutureEventListener<Versioned<List<LogSegmentMetadata>>> { - - CheckInProgressChangedPhase(Phase next) { - super(next); - } - - void processLogSegments(final List<LogSegmentMetadata> segments) { - // submit callback execution to dlg executor to avoid deadlock. - submit(new Runnable() { - @Override - public void run() { - logSegmentList = segments; - boolean isInitialPositioning = nextReadAheadPosition.definitelyLessThanOrEqualTo(startReadPosition); - for (int i = 0; i < logSegmentList.size(); i++) { - LogSegmentMetadata l = logSegmentList.get(i); - // By default we should skip truncated segments during initial positioning - if (l.isTruncated() && - isInitialPositioning && - !conf.getIgnoreTruncationStatus()) { - continue; - } - - DLSN nextReadDLSN = new DLSN(nextReadAheadPosition.getLogSegmentSequenceNumber(), - nextReadAheadPosition.getEntryId(), -1); - - // next read position still inside a log segment - final boolean hasDataToRead = (l.getLastDLSN().compareTo(nextReadDLSN) >= 0); - - // either there is data to read in current log segment or we are moving over a log segment that is - // still inprogress or was inprogress, we have check (or maybe close) this log segment. - final boolean checkOrCloseLedger = hasDataToRead || - // next read position move over a log segment, if l is still inprogress or it was inprogress - ((l.isInProgress() || (null != currentMetadata && currentMetadata.isInProgress())) && - l.getLogSegmentSequenceNumber() == nextReadAheadPosition.getLogSegmentSequenceNumber()); - - // If we are positioning on a partially truncated log segment then the truncation point should - // be before the nextReadPosition - if (l.isPartiallyTruncated() && - !isInitialPositioning && - (l.getMinActiveDLSN().compareTo(nextReadDLSN) > 0)) { - if (conf.getAlertWhenPositioningOnTruncated()) { - alertStatsLogger.raise("Trying to position reader on {} when {} is marked partially truncated", - nextReadAheadPosition, l); - } - - if (!conf.getIgnoreTruncationStatus()) { - LOG.error("{}: Trying to position reader on {} when {} is marked partially truncated", - new Object[]{ logMetadata.getFullyQualifiedName(), nextReadAheadPosition, l}); - setReadingFromTruncated(tracker); - return; - } - } - - - if (LOG.isTraceEnabled()) { - LOG.trace("CheckLogSegment : newMetadata = {}, currentMetadata = {}, nextReadAheadPosition = {}", - new Object[] { l, currentMetadata, nextReadAheadPosition}); - } - - if (checkOrCloseLedger) { - long startBKEntry = 0; - if (l.isPartiallyTruncated() && !conf.getIgnoreTruncationStatus()) { - startBKEntry = l.getMinActiveDLSN().getEntryId(); - } - - if(l.getLogSegmentSequenceNumber() == nextReadAheadPosition.getLogSegmentSequenceNumber()) { - startBKEntry = Math.max(startBKEntry, nextReadAheadPosition.getEntryId()); - if (currentMetadata != null) { - inProgressChanged = currentMetadata.isInProgress() && !l.isInProgress(); - } - } else { - // We are positioning on a new ledger => reset the current ledger handle - LOG.trace("Positioning {} on a new ledger {}", fullyQualifiedName, l); - - if (!closeCurrentLedgerHandle()) { - return; - } - } - - nextReadAheadPosition = new LedgerReadPosition(l.getLogSegmentId(), l.getLogSegmentSequenceNumber(), startBKEntry); - if (conf.getTraceReadAheadMetadataChanges()) { - LOG.info("Moved read position to {} for stream {} at {}.", - new Object[] {nextReadAheadPosition, logMetadata.getFullyQualifiedName(), System.currentTimeMillis() }); - } - - if (l.isTruncated()) { - if (conf.getAlertWhenPositioningOnTruncated()) { - alertStatsLogger.raise("Trying to position reader on {} when {} is marked truncated", - nextReadAheadPosition, l); - } - - if (!conf.getIgnoreTruncationStatus()) { - LOG.error("{}: Trying to position reader on {} when {} is marked truncated", - new Object[]{ logMetadata.getFullyQualifiedName(), nextReadAheadPosition, l}); - setReadingFromTruncated(tracker); - return; - } - } - - currentMetadata = l; - currentMetadataIndex = i; - break; - } - - // Handle multiple in progress => stop at the first in progress - if (l.isInProgress()) { - break; - } - } - - if (null == currentMetadata) { - if (isCatchingUp) { - isCatchingUp = false; - if (isHandleForReading) { - LOG.info("{} caught up at {}: position = {} and no log segment to position on at this point.", - new Object[] { fullyQualifiedName, System.currentTimeMillis(), nextReadAheadPosition }); - } - } - schedule(ReadAheadWorker.this, conf.getReadAheadWaitTimeOnEndOfStream()); - if (LOG.isDebugEnabled()) { - LOG.debug("No log segment to position on for {}. Backing off for {} millseconds", - fullyQualifiedName, conf.getReadAheadWaitTimeOnEndOfStream()); - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Initialized metadata for {}, starting reading ahead from {} : {}.", - new Object[] { fullyQualifiedName, currentMetadataIndex, currentMetadata }); - } - next.process(BKException.Code.OK); - } - - // Once we have read the ledger list for the first time, subsequent segments - // should be read in a streaming manner and we should get the new ledgers as - // they close in ZK through watchers. - // So lets start observing the latency - bkLedgerManager.reportGetSegmentStats(true); - } - }); - } - - @Override - void process(int rc) { - if (!running) { - setReadAheadStopped(); - return; - } - - tracker.enterPhase(ReadAheadPhase.GET_LEDGERS); - - inProgressChanged = false; - if (LOG.isTraceEnabled()) { - LOG.trace("Checking {} if InProgress changed.", fullyQualifiedName); - } - - if (reInitializeMetadata || null == currentMetadata) { - reInitializeMetadata = false; - if (LOG.isTraceEnabled()) { - LOG.trace("Reinitializing metadata for {}.", fullyQualifiedName); - } - if (metadataNotificationTimeMillis > 0) { - long metadataReinitializeTimeMillis = System.currentTimeMillis(); - long elapsedMillisSinceMetadataChanged = metadataReinitializeTimeMillis - metadataNotificationTimeMillis; - if (elapsedMillisSinceMetadataChanged >= metadataLatencyWarnThresholdMillis) { - LOG.warn("{} reinitialize metadata at {}, which is {} millis after receiving notification at {}.", - new Object[] { logMetadata.getFullyQualifiedName(), metadataReinitializeTimeMillis, - elapsedMillisSinceMetadataChanged, metadataNotificationTimeMillis}); - } - metadataReinitializationStat.registerSuccessfulEvent( - TimeUnit.MILLISECONDS.toMicros(elapsedMillisSinceMetadataChanged)); - metadataNotificationTimeMillis = -1L; - } - if (forceReadLogSegments) { - forceReadLogSegments = false; - bkLedgerManager.readLogSegmentsFromStore( - LogSegmentMetadata.COMPARATOR, - LogSegmentFilter.DEFAULT_FILTER, - null - ).addEventListener(this); - } else { - processLogSegments(logSegmentListNotified); - } - } else { - next.process(BKException.Code.OK); - } - } - - @Override - public void onSuccess(Versioned<List<LogSegmentMetadata>> segments) { - processLogSegments(segments.getValue()); - } - - @Override - public void onFailure(Throwable cause) { - LOG.info("Encountered metadata exception while reading log segments of {} : {}. Retrying ...", - bkLedgerManager.getFullyQualifiedName(), cause.getMessage()); - reInitializeMetadata = true; - forceReadLogSegments = true; - handleException(ReadAheadPhase.GET_LEDGERS, BKException.Code.ZKException); - } - } - - final class OpenLedgerPhase extends Phase - implements BookkeeperInternalCallbacks.GenericCallback<LedgerDescriptor>, - AsyncCallback.ReadLastConfirmedAndEntryCallback { - - OpenLedgerPhase(Phase next) { - super(next); - } - - private void issueReadLastConfirmedAndEntry(final boolean parallel, - final long lastAddConfirmed) { - final String ctx = String.format("ReadLastConfirmedAndEntry(%s, %d)", parallel? "Parallel":"Sequential", lastAddConfirmed); - final ReadLastConfirmedAndEntryCallbackWithNotification callback = - new ReadLastConfirmedAndEntryCallbackWithNotification(lastAddConfirmed, this, ctx); - boolean callbackImmediately = setMetadataNotification(callback); - handleCache.asyncReadLastConfirmedAndEntry( - currentLH, - nextReadAheadPosition.getEntryId(), - conf.getReadLACLongPollTimeout(), - parallel - ).addEventListener(new FutureEventListener<Pair<Long, LedgerEntry>>() { - @Override - public void onSuccess(Pair<Long, LedgerEntry> lacAndEntry) { - callback.readLastConfirmedAndEntryComplete( - BKException.Code.OK, - lacAndEntry.getLeft(), - lacAndEntry.getRight(), - ctx); - } - - @Override - public void onFailure(Throwable cause) { - callback.readLastConfirmedAndEntryComplete( - FutureUtils.bkResultCode(cause), - lastAddConfirmed, - null, - ctx); - } - }); - callback.callbackImmediately(callbackImmediately); - readAheadReadLACAndEntryCounter.inc(); - } - - @Override - void process(int rc) { - if (!running) { - setReadAheadStopped(); - return; - } - - tracker.enterPhase(ReadAheadPhase.OPEN_LEDGER); - - if (currentMetadata.isInProgress()) { // we don't want to fence the current journal - if (null == currentLH) { - if (conf.getTraceReadAheadMetadataChanges()) { - LOG.info("Opening inprogress ledger of {} for {} at {}.", - new Object[] { currentMetadata, fullyQualifiedName, System.currentTimeMillis() }); - } - handleCache.asyncOpenLedger(currentMetadata, false) - .addEventListener(new FutureEventListener<LedgerDescriptor>() { - @Override - public void onSuccess(LedgerDescriptor ld) { - operationComplete(BKException.Code.OK, ld); - } - - @Override - public void onFailure(Throwable cause) { - operationComplete(FutureUtils.bkResultCode(cause), null); - } - }); - } else { - final long lastAddConfirmed; - try { - lastAddConfirmed = handleCache.getLastAddConfirmed(currentLH); - } catch (BKException ie) { - // Exception is thrown due to no ledger handle - handleException(ReadAheadPhase.OPEN_LEDGER, ie.getCode()); - return; - } - - if (lastAddConfirmed < nextReadAheadPosition.getEntryId()) { - // This indicates that the currentMetadata is still marked in - // progress while we have already read all the entries. It might - // indicate a failure to detect metadata change. So we - // should probably try force read log segments if the reader has - // been idle for after a while. - if (LACNotAdvancedStopWatch.isRunning()) { - if (LACNotAdvancedStopWatch.elapsed(TimeUnit.MILLISECONDS) - > conf.getReaderIdleWarnThresholdMillis()) { - idleReaderWarn.inc(); - LOG.info("{} Ledger {} for inprogress segment {}, reader has been idle for warn threshold {}", - new Object[] { fullyQualifiedName, currentMetadata, currentLH, conf.getReaderIdleWarnThresholdMillis() }); - reInitializeMetadata = true; - forceReadLogSegments = true; - } - } else { - LACNotAdvancedStopWatch.reset().start(); - if (conf.getTraceReadAheadMetadataChanges()) { - LOG.info("{} Ledger {} for inprogress segment {} closed", - new Object[] { fullyQualifiedName, currentMetadata, currentLH }); - } - } - - tracker.enterPhase(ReadAheadPhase.READ_LAST_CONFIRMED); - - // the readahead is caught up if current ledger is in progress and read position moves over last add confirmed - if (isCatchingUp) { - isCatchingUp = false; - if (isHandleForReading) { - LOG.info("{} caught up at {}: lac = {}, position = {}.", - new Object[] { fullyQualifiedName, System.currentTimeMillis(), lastAddConfirmed, nextReadAheadPosition }); - } - } - - LOG.trace("Reading last add confirmed of {} for {}, as read poistion has moved over {} : {}", - new Object[] { currentMetadata, fullyQualifiedName, lastAddConfirmed, nextReadAheadPosition }); - - if (nextReadAheadPosition.getEntryId() == 0 && conf.getTraceReadAheadMetadataChanges()) { - // we are waiting for first entry to arrive - LOG.info("Reading last add confirmed for {} at {}: lac = {}, position = {}.", - new Object[] { fullyQualifiedName, System.currentTimeMillis(), lastAddConfirmed, nextReadAheadPosition}); - } else { - LOG.trace("Reading last add confirmed for {} at {}: lac = {}, position = {}.", - new Object[] { fullyQualifiedName, System.currentTimeMillis(), lastAddConfirmed, nextReadAheadPosition}); - } - issueReadLastConfirmedAndEntry(false, lastAddConfirmed); - } else { - next.process(BKException.Code.OK); - } - } - } else { - LACNotAdvancedStopWatch.reset(); - if (null != currentLH) { - try { - if (inProgressChanged) { - LOG.trace("Closing completed ledger of {} for {}.", currentMetadata, fullyQualifiedName); - if (!closeCurrentLedgerHandle()) { - return; - } - inProgressChanged = false; - } else { - long lastAddConfirmed = handleCache.getLastAddConfirmed(currentLH); - if (nextReadAheadPosition.getEntryId() > lastAddConfirmed) { - // Its possible that the last entryId does not account for the control - // log record, but the lastAddConfirmed should never be short of the - // last entry id; else we maybe missing an entry - boolean gapDetected = false; - if (lastAddConfirmed < currentMetadata.getLastEntryId()) { - alertStatsLogger.raise("Unexpected last entry id during read ahead; {} , {}", - currentMetadata, lastAddConfirmed); - gapDetected = true; - } - - if (conf.getPositionGapDetectionEnabled() && gapDetected) { - setReadAheadError(tracker, new UnexpectedException( - "Unexpected last entry id during read ahead : " + currentMetadata - + ", lac = " + lastAddConfirmed)); - } else { - // This disconnect will only surface during repositioning and - // will not silently miss records; therefore its safe to not halt - // reading, but we should print a warning for easy diagnosis - if (conf.getTraceReadAheadMetadataChanges() && lastAddConfirmed > (currentMetadata.getLastEntryId() + 1)) { - LOG.warn("Potential Metadata Corruption {} for stream {}, lastAddConfirmed {}", - new Object[] {currentMetadata, logMetadata.getFullyQualifiedName(), lastAddConfirmed}); - } - - LOG.trace("Past the last Add Confirmed {} in ledger {} for {}", - new Object[] { lastAddConfirmed, currentMetadata, fullyQualifiedName }); - if (!closeCurrentLedgerHandle()) { - return; - } - LogSegmentMetadata oldMetadata = currentMetadata; - currentMetadata = null; - if (currentMetadataIndex + 1 < logSegmentList.size()) { - currentMetadata = logSegmentList.get(++currentMetadataIndex); - if (currentMetadata.getLogSegmentSequenceNumber() != (oldMetadata.getLogSegmentSequenceNumber() + 1)) { - // We should never get here as we should have exited the loop if - // pendingRequests were empty - alertStatsLogger.raise("Unexpected condition during read ahead; {} , {}", - currentMetadata, oldMetadata); - setReadAheadError(tracker, new UnexpectedException( - "Unexpected condition during read ahead : current metadata " - + currentMetadata + ", old metadata " + oldMetadata)); - } else { - if (currentMetadata.isTruncated()) { - if (conf.getAlertWhenPositioningOnTruncated()) { - alertStatsLogger.raise("Trying to position reader on the log segment that is marked truncated : {}", - currentMetadata); - } - - if (!conf.getIgnoreTruncationStatus()) { - LOG.error("{}: Trying to position reader on the log segment that is marked truncated : {}", - logMetadata.getFullyQualifiedName(), currentMetadata); - setReadingFromTruncated(tracker); - } - } else { - if (LOG.isTraceEnabled()) { - LOG.trace("Moving read position to a new ledger {} for {}.", - currentMetadata, fullyQualifiedName); - } - nextReadAheadPosition.positionOnNewLogSegment(currentMetadata.getLogSegmentId(), currentMetadata.getLogSegmentSequenceNumber()); - } - } - } - } - } - } - if (!readAheadError) { - next.process(BKException.Code.OK); - } - } catch (BKException bke) { - LOG.debug("Exception while repositioning", bke); - handleException(ReadAheadPhase.CLOSE_LEDGER, bke.getCode()); - } - } else { - LOG.trace("Opening completed ledger of {} for {}.", currentMetadata, fullyQualifiedName); - handleCache.asyncOpenLedger(currentMetadata, true) - .addEventListener(new FutureEventListener<LedgerDescriptor>() { - @Override - public void onSuccess(LedgerDescriptor ld) { - operationComplete(BKException.Code.OK, ld); - } - - @Override - public void onFailure(Throwable cause) { - operationComplete(FutureUtils.bkResultCode(cause), null); - } - }); - } - } - - } - - @Override - public void operationComplete(final int rc, final LedgerDescriptor result) { - // submit callback execution to dlg executor to avoid deadlock. - submit(new Runnable() { - @Override - public void run() { - if (BKException.Code.OK != rc) { - LOG.debug("BK Exception {} while opening ledger", rc); - handleException(ReadAheadPhase.OPEN_LEDGER, rc); - return; - } - currentLH = result; - if (conf.getTraceReadAheadMetadataChanges()) { - LOG.info("Opened ledger of {} for {} at {}.", - new Object[]{currentMetadata, fullyQualifiedName, System.currentTimeMillis()}); - } - bkcZkExceptions.set(0); - bkcUnExpectedExceptions.set(0); - bkcNoLedgerExceptionsOnReadLAC.set(0); - next.process(rc); - } - }); - } - - /** - * Handle the result of reading last add confirmed. - * - * @param rc - * result of reading last add confirmed - */ - private void handleReadLastConfirmedError(int rc) { - if (BKException.Code.NoSuchLedgerExistsException == rc) { - if (bkcNoLedgerExceptionsOnReadLAC.incrementAndGet() > noLedgerExceptionOnReadLACThreshold) { - LOG.info("{} No entries published to ledger {} yet for {} millis.", - new Object[] { fullyQualifiedName, currentLH, conf.getReadAheadNoSuchLedgerExceptionOnReadLACErrorThresholdMillis() }); - bkcNoLedgerExceptionsOnReadLAC.set(0); - // set current ledger handle to null, so it would be re-opened again. - // if the ledger does disappear, it would trigger re-initialize log segments on handling openLedger exceptions - if (closeCurrentLedgerHandle()) { - next.process(BKException.Code.OK); - } - return; - } else { - if (LOG.isTraceEnabled()) { - LOG.info("{} No entries published to ledger {} yet. Backoff reading ahead for {} ms.", - new Object[]{fullyQualifiedName, currentLH, conf.getReadAheadWaitTime()}); - } - // Backoff before resuming - schedule(ReadAheadWorker.this, conf.getReadAheadWaitTime()); - return; - } - } else if (BKException.Code.OK != rc) { - handleException(ReadAheadPhase.READ_LAST_CONFIRMED, rc); - return; - } - } - - public void readLastConfirmedAndEntryComplete(final int rc, final long lastConfirmed, final LedgerEntry entry, - final Object ctx) { - // submit callback execution to dlg executor to avoid deadlock. - submit(new Runnable() { - @Override - public void run() { - if (BKException.Code.OK != rc) { - handleReadLastConfirmedError(rc); - return; - } - bkcZkExceptions.set(0); - bkcUnExpectedExceptions.set(0); - bkcNoLedgerExceptionsOnReadLAC.set(0); - if (LOG.isTraceEnabled()) { - try { - LOG.trace("Advancing Last Add Confirmed of {} for {} : {}, {}", - new Object[] { currentMetadata, fullyQualifiedName, lastConfirmed, - handleCache.getLastAddConfirmed(currentLH) }); - } catch (BKException exc) { - // Ignore - } - } - - if ((null != entry) - && (nextReadAheadPosition.getEntryId() == entry.getEntryId()) - && (nextReadAheadPosition.getLedgerId() == entry.getLedgerId())) { - if (lastConfirmed <= 4 && conf.getTraceReadAheadMetadataChanges()) { - LOG.info("Hit readLastConfirmedAndEntry for {} at {} : entry = {}, lac = {}, position = {}.", - new Object[] { fullyQualifiedName, System.currentTimeMillis(), - entry.getEntryId(), lastConfirmed, nextReadAheadPosition }); - } - - if (!isCatchingUp) { - long lac = lastConfirmed - nextReadAheadPosition.getEntryId(); - if (lac > 0) { - readAheadLacLagStats.registerSuccessfulEvent(lac); - } - } - - nextReadAheadPosition.advance(); - - readAheadCache.set(new LedgerReadPosition(entry.getLedgerId(), currentLH.getLogSegmentSequenceNo(), entry.getEntryId()), - entry, null != ctx ? ctx.toString() : "", - currentMetadata.getEnvelopeEntries(), currentMetadata.getStartSequenceId()); - - if (LOG.isTraceEnabled()) { - LOG.trace("Reading the value received {} for {} : entryId {}", - new Object[] { currentMetadata, fullyQualifiedName, entry.getEntryId() }); - } - readAheadEntryPiggyBackHits.inc(); - } else { - if (lastConfirmed > nextReadAheadPosition.getEntryId()) { - LOG.info("{} : entry {} isn't piggybacked but last add confirmed already moves to {}.", - new Object[] { logMetadata.getFullyQualifiedName(), nextReadAheadPosition.getEntryId(), lastConfirmed }); - } - readAheadEntryPiggyBackMisses.inc(); - } - next.process(rc); - } - }); - } - } - - final class ReadEntriesPhase extends Phase implements Runnable { - - boolean cacheFull = false; - long lastAddConfirmed = -1; - - ReadEntriesPhase(Phase next) { - super(next); - } - - @Override - void process(int rc) { - if (!running) { - setReadAheadStopped(); - return; - } - - tracker.enterPhase(ReadAheadPhase.READ_ENTRIES); - - cacheFull = false; - lastAddConfirmed = -1; - if (null != currentLH) { - try { - lastAddConfirmed = handleCache.getLastAddConfirmed(currentLH); - } catch (BKException bke) { - handleException(ReadAheadPhase.READ_LAST_CONFIRMED, bke.getCode()); - return; - } - read(); - } else { - complete(); - } - } - - private void read() { - if (lastAddConfirmed < nextReadAheadPosition.getEntryId()) { - if (LOG.isTraceEnabled()) { - LOG.trace("Nothing to read for {} of {} : lastAddConfirmed = {}, nextReadAheadPosition = {}", - new Object[] { currentMetadata, fullyQualifiedName, lastAddConfirmed, nextReadAheadPosition}); - } - complete(); - return; - } - if (LOG.isTraceEnabled()) { - LOG.trace("Reading entry {} for {} of {}.", - new Object[] {nextReadAheadPosition, currentMetadata, fullyQualifiedName }); - } - int readAheadBatchSize = dynConf.getReadAheadBatchSize(); - final long startEntryId = nextReadAheadPosition.getEntryId(); - final long endEntryId = Math.min(lastAddConfirmed, (nextReadAheadPosition.getEntryId() + readAheadBatchSize - 1)); - - if (endEntryId <= readAheadBatchSize && conf.getTraceReadAheadMetadataChanges()) { - // trace first read batch - LOG.info("Reading entries ({} - {}) for {} at {} : lac = {}, nextReadAheadPosition = {}.", - new Object[] { startEntryId, endEntryId, fullyQualifiedName, System.currentTimeMillis(), lastAddConfirmed, nextReadAheadPosition}); - } - - final String readCtx = String.format("ReadEntries(%d-%d)", startEntryId, endEntryId); - handleCache.asyncReadEntries(currentLH, startEntryId, endEntryId) - .addEventListener(new FutureEventListener<Enumeration<LedgerEntry>>() { - - @Override - public void onSuccess(Enumeration<LedgerEntry> entries) { - int rc = BKException.Code.OK; - - if (failureInjector.shouldInjectCorruption(startEntryId, endEntryId)) { - rc = BKException.Code.DigestMatchException; - } - readComplete(rc, null, entries, readCtx, startEntryId, endEntryId); - } - - @Override - public void onFailure(Throwable cause) { - readComplete(FutureUtils.bkResultCode(cause), null, null, readCtx, startEntryId, endEntryId); - } - }); - } - - public void readComplete(final int rc, final LedgerHandle lh, - final Enumeration<LedgerEntry> seq, final Object ctx, - final long startEntryId, final long endEntryId) { - // submit callback execution to dlg executor to avoid deadlock. - submit(new Runnable() { - @Override - public void run() { - // If readAheadSkipBrokenEntries is enabled and we hit a corrupt entry, log and - // stat the issue and move forward. - if (BKException.Code.DigestMatchException == rc && readAheadSkipBrokenEntries) { - readAheadReadEntriesStat.registerFailedEvent(0); - LOG.error("BK DigestMatchException while reading entries {}-{} in stream {}, entry {} discarded", - new Object[] { startEntryId, endEntryId, fullyQualifiedName, startEntryId }); - bkcZkExceptions.set(0); - bkcUnExpectedExceptions.set(0); - readAheadSkippedBrokenEntries.inc(); - nextReadAheadPosition.advance(); - } else if (BKException.Code.OK != rc) { - readAheadReadEntriesStat.registerFailedEvent(0); - LOG.debug("BK Exception {} while reading entry", rc); - handleException(ReadAheadPhase.READ_ENTRIES, rc); - return; - } else { - int numReads = 0; - while (seq.hasMoreElements()) { - bkcZkExceptions.set(0); - bkcUnExpectedExceptions.set(0); - nextReadAheadPosition.advance(); - LedgerEntry e = seq.nextElement(); - LedgerReadPosition readPosition = new LedgerReadPosition(e.getLedgerId(), currentMetadata.getLogSegmentSequenceNumber(), e.getEntryId()); - readAheadCache.set(readPosition, e, null != ctx ? ctx.toString() : "", - currentMetadata.getEnvelopeEntries(), currentMetadata.getStartSequenceId()); - ++numReads; - if (LOG.isDebugEnabled()) { - LOG.debug("Read entry {} of {}.", readPosition, fullyQualifiedName); - } - } - readAheadReadEntriesStat.registerSuccessfulEvent(numReads); - } - if (readAheadCache.isCacheFull()) { - cacheFull = true; - complete(); - } else { - read(); - } - } - }); - } - - private void complete() { - if (cacheFull) { - LOG.trace("Cache for {} is full. Backoff reading until notified", fullyQualifiedName); - readAheadCacheFullCounter.inc(); - resumeStopWatch.reset().start(); - stopPromise = null; - readAheadCache.setReadAheadCallback(ReadAheadWorker.this); - } else { - run(); - } - } - - @Override - public void run() { - next.process(BKException.Code.OK); - } - } - - @Override - public void run() { - if (!running) { - setReadAheadStopped(); - return; - } - readAheadPhase.process(BKException.Code.OK); - } - - @Override - public void onSegmentsUpdated(List<LogSegmentMetadata> segments) { - AsyncNotification notification; - synchronized (notificationLock) { - logSegmentListNotified = segments; - reInitializeMetadata = true; - LOG.debug("{} Read ahead node changed", fullyQualifiedName); - notification = metadataNotification; - metadataNotification = null; - } - metadataNotificationTimeMillis = System.currentTimeMillis(); - if (null != notification) { - notification.notifyOnOperationComplete(); - } - } - - @Override - public void onLogStreamDeleted() { - logDeleted = true; - setReadAheadError(tracker, new LogNotFoundException("Log stream " - + bkLedgerManager.getFullyQualifiedName() + " is deleted.")); - } - - /** - * Set metadata notification and return the flag indicating whether to reinitialize metadata. - * - * @param notification - * metadata notification - * @return flag indicating whether to reinitialize metadata. - */ - private boolean setMetadataNotification(AsyncNotification notification) { - synchronized (notificationLock) { - this.metadataNotification = notification; - return reInitializeMetadata; - } - } - - @VisibleForTesting - public AsyncNotification getMetadataNotification() { - synchronized (notificationLock) { - return metadataNotification; - } - } - - /** - * A scheduled runnable that could be waken and executed immediately when notification arrives. - * - * E.g - * <p> - * The reader reaches end of stream, it backs off to schedule next read in 2 seconds. - * <br/> - * if a new log segment is created, without this change, reader has to wait 2 seconds to read - * entries in new log segment, which means delivery latency of entries in new log segment could - * be up to 2 seconds. but with this change, the task would be executed immediately, which reader - * would be waken up from backoff, which would reduce the delivery latency. - * </p> - */ - class InterruptibleScheduledRunnable implements AsyncNotification, Runnable { - - final Runnable task; - final AtomicBoolean called = new AtomicBoolean(false); - final long startNanos; - - InterruptibleScheduledRunnable(Runnable task) { - this.task = task; - this.startNanos = MathUtils.nowInNano(); - } - - @Override - public void notifyOnError(Throwable t) { - longPollInterruptionStat.registerFailedEvent(MathUtils.elapsedMicroSec(startNanos)); - execute(); - } - - @Override - public void notifyOnOperationComplete() { - longPollInterruptionStat.registerSuccessfulEvent(MathUtils.elapsedMicroSec(startNanos)); - execute(); - } - - @Override - public void run() { - if (called.compareAndSet(false, true)) { - task.run(); - } - } - - void execute() { - if (called.compareAndSet(false, true)) { - submit(task); - } - } - } - - abstract class LongPollNotification<T> implements AsyncNotification { - - final long lac; - final T cb; - final Object ctx; - final AtomicBoolean called = new AtomicBoolean(false); - final long startNanos; - - LongPollNotification(long lac, T cb, Object ctx) { - this.lac = lac; - this.cb = cb; - this.ctx = ctx; - this.startNanos = MathUtils.nowInNano(); - } - - void complete(boolean success) { - long startTime = MathUtils.nowInNano(); - doComplete(success); - if (success) { - notificationExecutionStat.registerSuccessfulEvent(MathUtils.elapsedMicroSec(startTime)); - } else { - notificationExecutionStat.registerFailedEvent(MathUtils.elapsedMicroSec(startTime)); - } - } - - abstract void doComplete(boolean success); - - @Override - public void notifyOnError(Throwable cause) { - longPollInterruptionStat.registerFailedEvent(MathUtils.elapsedMicroSec(startNanos)); - complete(false); - } - - @Override - public void notifyOnOperationComplete() { - longPollInterruptionStat.registerSuccessfulEvent(MathUtils.elapsedMicroSec(startNanos)); - complete(true); - } - - void callbackImmediately(boolean immediate) { - if (immediate) { - complete(true); - } - } - } - - class ReadLastConfirmedAndEntryCallbackWithNotification - extends LongPollNotification<AsyncCallback.ReadLastConfirmedAndEntryCallback> - implements AsyncCallback.ReadLastConfirmedAndEntryCallback { - - ReadLastConfirmedAndEntryCallbackWithNotification( - long lac, AsyncCallback.ReadLastConfirmedAndEntryCallback cb, Object ctx) { - super(lac, cb, ctx); - } - - @Override - public void readLastConfirmedAndEntryComplete(int rc, long lac, LedgerEntry entry, Object ctx) { - if (called.compareAndSet(false, true)) { - // clear the notification when callback - synchronized (notificationLock) { - metadataNotification = null; - } - this.cb.readLastConfirmedAndEntryComplete(rc, lac, entry, ctx); - } - } - - @Override - void doComplete(boolean success) { - readLastConfirmedAndEntryComplete(BKException.Code.OK, lac, null, ctx); - } - - } -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/ReadAheadExceptionsLogger.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/ReadAheadExceptionsLogger.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/ReadAheadExceptionsLogger.java deleted file mode 100644 index 326f92b..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/ReadAheadExceptionsLogger.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.stats; - -import org.apache.bookkeeper.stats.StatsLogger; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -/** - * Stats logger to log exceptions happened in {@link com.twitter.distributedlog.readahead.ReadAheadWorker}. - * They are counters of exceptions happened on each read ahead phase: - * <code>`scope`/exceptions/`phase`/`code`</code>. `scope` is the current scope of - * stats logger, `phase` is the read ahead phase, while `code` is the exception code. Check - * {@link com.twitter.distributedlog.readahead.ReadAheadPhase} for details about phases and - * {@link BKExceptionStatsLogger} for details about `code`. - */ -public class ReadAheadExceptionsLogger { - - private final StatsLogger statsLogger; - private StatsLogger parentExceptionStatsLogger; - private final ConcurrentMap<String, BKExceptionStatsLogger> exceptionStatsLoggers = - new ConcurrentHashMap<String, BKExceptionStatsLogger>(); - - public ReadAheadExceptionsLogger(StatsLogger statsLogger) { - this.statsLogger = statsLogger; - } - - public BKExceptionStatsLogger getBKExceptionStatsLogger(String phase) { - // initialize the parent exception stats logger lazily - if (null == parentExceptionStatsLogger) { - parentExceptionStatsLogger = statsLogger.scope("exceptions"); - } - BKExceptionStatsLogger exceptionStatsLogger = exceptionStatsLoggers.get(phase); - if (null == exceptionStatsLogger) { - exceptionStatsLogger = new BKExceptionStatsLogger(parentExceptionStatsLogger.scope(phase)); - BKExceptionStatsLogger oldExceptionStatsLogger = - exceptionStatsLoggers.putIfAbsent(phase, exceptionStatsLogger); - if (null != oldExceptionStatsLogger) { - exceptionStatsLogger = oldExceptionStatsLogger; - } - } - return exceptionStatsLogger; - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/test/java/com/twitter/distributedlog/TestLedgerHandleCache.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestLedgerHandleCache.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestLedgerHandleCache.java deleted file mode 100644 index 1829e54..0000000 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestLedgerHandleCache.java +++ /dev/null @@ -1,180 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import com.twitter.distributedlog.util.FutureUtils; -import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.client.BookKeeper; -import org.apache.bookkeeper.client.LedgerHandle; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static com.google.common.base.Charsets.UTF_8; -import static org.junit.Assert.*; - -/** - * Test {@link LedgerHandleCache} - */ -public class TestLedgerHandleCache extends TestDistributedLogBase { - static final Logger LOG = LoggerFactory.getLogger(TestLedgerHandleCache.class); - - protected static String ledgersPath = "/ledgers"; - - private ZooKeeperClient zkc; - private BookKeeperClient bkc; - - @Before - public void setup() throws Exception { - zkc = TestZooKeeperClientBuilder.newBuilder() - .zkServers(zkServers) - .build(); - bkc = BookKeeperClientBuilder.newBuilder() - .name("bkc") - .zkc(zkc) - .ledgersPath(ledgersPath) - .dlConfig(conf) - .build(); - } - - @After - public void teardown() throws Exception { - bkc.close(); - zkc.close(); - } - - @Test(timeout = 60000, expected = NullPointerException.class) - public void testBuilderWithoutBKC() throws Exception { - LedgerHandleCache.newBuilder().build(); - } - - @Test(timeout = 60000, expected = NullPointerException.class) - public void testBuilderWithoutStatsLogger() throws Exception { - LedgerHandleCache.newBuilder().bkc(bkc).conf(conf).statsLogger(null).build(); - } - - @Test(timeout = 60000, expected = BKException.BKBookieHandleNotAvailableException.class) - public void testOpenLedgerWhenBkcClosed() throws Exception { - BookKeeperClient newBkc = BookKeeperClientBuilder.newBuilder().name("newBkc") - .zkc(zkc).ledgersPath(ledgersPath).dlConfig(conf).build(); - LedgerHandleCache cache = - LedgerHandleCache.newBuilder().bkc(newBkc).conf(conf).build(); - // closed the bkc - newBkc.close(); - // open ledger after bkc closed. - cache.openLedger(new LogSegmentMetadata.LogSegmentMetadataBuilder("", 2, 1, 1).setRegionId(1).build(), false); - } - - @Test(timeout = 60000, expected = BKException.ZKException.class) - public void testOpenLedgerWhenZkClosed() throws Exception { - ZooKeeperClient newZkc = TestZooKeeperClientBuilder.newBuilder() - .name("zkc-openledger-when-zk-closed") - .zkServers(zkServers) - .build(); - BookKeeperClient newBkc = BookKeeperClientBuilder.newBuilder() - .name("bkc-openledger-when-zk-closed") - .zkc(newZkc) - .ledgersPath(ledgersPath) - .dlConfig(conf) - .build(); - try { - LedgerHandle lh = newBkc.get().createLedger(BookKeeper.DigestType.CRC32, "zkcClosed".getBytes(UTF_8)); - lh.close(); - newZkc.close(); - LedgerHandleCache cache = - LedgerHandleCache.newBuilder().bkc(newBkc).conf(conf).build(); - // open ledger after zkc closed - cache.openLedger(new LogSegmentMetadata.LogSegmentMetadataBuilder("", - 2, lh.getId(), 1).setLogSegmentSequenceNo(lh.getId()).build(), false); - } finally { - newBkc.close(); - } - } - - @Test(timeout = 60000, expected = BKException.BKUnexpectedConditionException.class) - public void testReadLastConfirmedWithoutOpeningLedger() throws Exception { - LedgerDescriptor desc = new LedgerDescriptor(9999, 9999, false); - LedgerHandleCache cache = - LedgerHandleCache.newBuilder().bkc(bkc).conf(conf).build(); - // read last confirmed - cache.tryReadLastConfirmed(desc); - } - - @Test(timeout = 60000, expected = BKException.BKUnexpectedConditionException.class) - public void testReadEntriesWithoutOpeningLedger() throws Exception { - LedgerDescriptor desc = new LedgerDescriptor(9999, 9999, false); - LedgerHandleCache cache = - LedgerHandleCache.newBuilder().bkc(bkc).conf(conf).build(); - // read entries - cache.readEntries(desc, 0, 10); - } - - @Test(timeout = 60000, expected = BKException.BKUnexpectedConditionException.class) - public void testGetLastConfirmedWithoutOpeningLedger() throws Exception { - LedgerDescriptor desc = new LedgerDescriptor(9999, 9999, false); - LedgerHandleCache cache = - LedgerHandleCache.newBuilder().bkc(bkc).conf(conf).build(); - // read entries - cache.getLastAddConfirmed(desc); - } - - @Test(timeout = 60000, expected = BKException.BKUnexpectedConditionException.class) - public void testReadLastConfirmedAndEntryWithoutOpeningLedger() throws Exception { - LedgerDescriptor desc = new LedgerDescriptor(9999, 9999, false); - LedgerHandleCache cache = - LedgerHandleCache.newBuilder().bkc(bkc).conf(conf).build(); - // read entries - FutureUtils.bkResult(cache.asyncReadLastConfirmedAndEntry(desc, 1L, 200L, false)); - } - - @Test(timeout = 60000, expected = BKException.BKUnexpectedConditionException.class) - public void testGetLengthWithoutOpeningLedger() throws Exception { - LedgerDescriptor desc = new LedgerDescriptor(9999, 9999, false); - LedgerHandleCache cache = - LedgerHandleCache.newBuilder().bkc(bkc).conf(conf).build(); - // read entries - cache.getLength(desc); - } - - @Test(timeout = 60000) - public void testOpenAndCloseLedger() throws Exception { - LedgerHandle lh = bkc.get().createLedger(1, 1, 1, - BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(UTF_8)); - LedgerHandleCache cache = - LedgerHandleCache.newBuilder().bkc(bkc).conf(conf).build(); - LogSegmentMetadata segment = new LogSegmentMetadata.LogSegmentMetadataBuilder( - "/data", LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V5_SEQUENCE_ID, lh.getId(), 0L) - .build(); - LedgerDescriptor desc1 = cache.openLedger(segment, false); - assertTrue(cache.handlesMap.containsKey(desc1)); - LedgerHandleCache.RefCountedLedgerHandle refLh = cache.handlesMap.get(desc1); - assertEquals(1, refLh.getRefCount()); - cache.openLedger(segment, false); - assertTrue(cache.handlesMap.containsKey(desc1)); - assertEquals(2, refLh.getRefCount()); - // close the ledger - cache.closeLedger(desc1); - assertTrue(cache.handlesMap.containsKey(desc1)); - assertEquals(1, refLh.getRefCount()); - cache.closeLedger(desc1); - assertFalse(cache.handlesMap.containsKey(desc1)); - assertEquals(0, refLh.getRefCount()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAheadEntryReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAheadEntryReader.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAheadEntryReader.java index 573ae5c..74a5231 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAheadEntryReader.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAheadEntryReader.java @@ -99,7 +99,7 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase { true); LogSegmentEntryStore entryStore = new BKLogSegmentEntryStore( conf, - bkc.get(), + bkc, scheduler, NullStatsLogger.INSTANCE, AsyncFailureInjector.NULL); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadUtils.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadUtils.java index 998c7ba..4358a8e 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadUtils.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadUtils.java @@ -34,8 +34,6 @@ import org.junit.Test; import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction0; -import scala.runtime.BoxedUnit; import static org.junit.Assert.*; @@ -52,42 +50,22 @@ public class TestReadUtils extends TestDistributedLogBase { private Future<Optional<LogRecordWithDLSN>> getLogRecordNotLessThanTxId( BKDistributedLogManager bkdlm, int logsegmentIdx, long transactionId) throws Exception { List<LogSegmentMetadata> logSegments = bkdlm.getLogSegments(); - final LedgerHandleCache handleCache = LedgerHandleCache.newBuilder() - .bkc(bkdlm.getWriterBKC()) - .conf(conf) - .build(); return ReadUtils.getLogRecordNotLessThanTxId( bkdlm.getStreamName(), logSegments.get(logsegmentIdx), transactionId, Executors.newSingleThreadExecutor(), - handleCache, + bkdlm.getReaderEntryStore(), 10 - ).ensure(new AbstractFunction0<BoxedUnit>() { - @Override - public BoxedUnit apply() { - handleCache.clear(); - return BoxedUnit.UNIT; - } - }); + ); } private Future<LogRecordWithDLSN> getFirstGreaterThanRecord(BKDistributedLogManager bkdlm, int ledgerNo, DLSN dlsn) throws Exception { List<LogSegmentMetadata> ledgerList = bkdlm.getLogSegments(); - final LedgerHandleCache handleCache = LedgerHandleCache.newBuilder() - .bkc(bkdlm.getWriterBKC()) - .conf(conf) - .build(); return ReadUtils.asyncReadFirstUserRecord( bkdlm.getStreamName(), ledgerList.get(ledgerNo), 2, 16, new AtomicInteger(0), Executors.newFixedThreadPool(1), - handleCache, dlsn - ).ensure(new AbstractFunction0<BoxedUnit>() { - @Override - public BoxedUnit apply() { - handleCache.clear(); - return BoxedUnit.UNIT; - } - }); + bkdlm.getReaderEntryStore(), dlsn + ); } private Future<LogRecordWithDLSN> getLastUserRecord(BKDistributedLogManager bkdlm, int ledgerNo) throws Exception { @@ -98,20 +76,10 @@ public class TestReadUtils extends TestDistributedLogBase { LogSegmentFilter.DEFAULT_FILTER, null) ).getValue(); - final LedgerHandleCache handleCache = LedgerHandleCache.newBuilder() - .bkc(bkdlm.getWriterBKC()) - .conf(conf) - .build(); return ReadUtils.asyncReadLastRecord( bkdlm.getStreamName(), ledgerList.get(ledgerNo), false, false, false, 2, 16, new AtomicInteger(0), Executors.newFixedThreadPool(1), - handleCache - ).ensure(new AbstractFunction0<BoxedUnit>() { - @Override - public BoxedUnit apply() { - handleCache.clear(); - return BoxedUnit.UNIT; - } - }); + bkdlm.getReaderEntryStore() + ); } @Test(timeout = 60000)