DL-160: Remove 'DLSN' suffix from async and sync readers
Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/c7751804 Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/c7751804 Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/c7751804 Branch: refs/heads/master Commit: c7751804ef728c39d54b19e52f2db48d7fba9f65 Parents: 7a97797 Author: Sijie Guo <sij...@twitter.com> Authored: Wed Dec 28 16:27:38 2016 -0800 Committer: Sijie Guo <sij...@twitter.com> Committed: Thu Dec 29 02:12:04 2016 -0800 ---------------------------------------------------------------------- .../distributedlog/BKAsyncLogReader.java | 748 +++++++++++++++++++ .../distributedlog/BKAsyncLogReaderDLSN.java | 748 ------------------- .../distributedlog/BKDistributedLogManager.java | 8 +- .../twitter/distributedlog/BKSyncLogReader.java | 276 +++++++ .../distributedlog/BKSyncLogReaderDLSN.java | 276 ------- .../src/main/resources/findbugsExclude.xml | 2 +- .../NonBlockingReadsTestUtil.java | 6 +- .../distributedlog/TestAsyncReaderLock.java | 6 +- .../distributedlog/TestAsyncReaderWriter.java | 14 +- .../distributedlog/TestBKSyncLogReader.java | 6 +- .../TestNonBlockingReadsMultiReader.java | 2 +- .../com/twitter/distributedlog/TestReader.java | 2 +- .../distributedlog/TestRollLogSegments.java | 4 +- 13 files changed, 1049 insertions(+), 1049 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java new file mode 100644 index 0000000..18d2e15 --- /dev/null +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java @@ -0,0 +1,748 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Stopwatch; +import com.google.common.base.Ticker; +import com.twitter.distributedlog.exceptions.DLIllegalStateException; +import com.twitter.distributedlog.exceptions.DLInterruptedException; +import com.twitter.distributedlog.exceptions.EndOfStreamException; +import com.twitter.distributedlog.exceptions.IdleReaderException; +import com.twitter.distributedlog.exceptions.LogNotFoundException; +import com.twitter.distributedlog.exceptions.ReadCancelledException; +import com.twitter.distributedlog.exceptions.UnexpectedException; +import com.twitter.distributedlog.util.Utils; +import com.twitter.util.Future; +import com.twitter.util.FutureEventListener; +import com.twitter.util.Promise; +import com.twitter.util.Throw; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.versioning.Versioned; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Function1; +import scala.runtime.AbstractFunction1; +import scala.runtime.BoxedUnit; + +/** + * BookKeeper based {@link AsyncLogReader} implementation. + * + * <h3>Metrics</h3> + * All the metrics are exposed under `async_reader`. + * <ul> + * <li> `async_reader`/future_set: opstats. time spent on satisfying futures of read requests. + * if it is high, it means that the caller takes time on processing the result of read requests. + * The side effect is blocking consequent reads. + * <li> `async_reader`/schedule: opstats. time spent on scheduling next reads. + * <li> `async_reader`/background_read: opstats. time spent on background reads. + * <li> `async_reader`/read_next_exec: opstats. time spent on executing {@link #readNext()}. + * <li> `async_reader`/time_between_read_next: opstats. time spent on between two consequent {@link #readNext()}. + * if it is high, it means that the caller is slowing down on calling {@link #readNext()}. + * <li> `async_reader`/delay_until_promise_satisfied: opstats. total latency for the read requests. + * <li> `async_reader`/idle_reader_error: counter. the number idle reader errors. + * </ul> + */ +class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification { + static final Logger LOG = LoggerFactory.getLogger(BKAsyncLogReader.class); + + private static final Function1<List<LogRecordWithDLSN>, LogRecordWithDLSN> READ_NEXT_MAP_FUNCTION = + new AbstractFunction1<List<LogRecordWithDLSN>, LogRecordWithDLSN>() { + @Override + public LogRecordWithDLSN apply(List<LogRecordWithDLSN> records) { + return records.get(0); + } + }; + + protected final BKDistributedLogManager bkDistributedLogManager; + protected final BKLogReadHandler readHandler; + private final AtomicReference<Throwable> lastException = new AtomicReference<Throwable>(); + private final ScheduledExecutorService executorService; + private final ConcurrentLinkedQueue<PendingReadRequest> pendingRequests = new ConcurrentLinkedQueue<PendingReadRequest>(); + private final Object scheduleLock = new Object(); + private final AtomicLong scheduleCount = new AtomicLong(0); + final private Stopwatch scheduleDelayStopwatch; + final private Stopwatch readNextDelayStopwatch; + private DLSN startDLSN; + private ReadAheadEntryReader readAheadReader = null; + private int lastPosition = 0; + private final boolean positionGapDetectionEnabled; + private final int idleErrorThresholdMillis; + final ScheduledFuture<?> idleReaderTimeoutTask; + private ScheduledFuture<?> backgroundScheduleTask = null; + // last process time + private final Stopwatch lastProcessTime; + + protected Promise<Void> closeFuture = null; + + private boolean lockStream = false; + + private final boolean returnEndOfStreamRecord; + + private final Runnable BACKGROUND_READ_SCHEDULER = new Runnable() { + @Override + public void run() { + synchronized (scheduleLock) { + backgroundScheduleTask = null; + } + scheduleBackgroundRead(); + } + }; + + // State + private Entry.Reader currentEntry = null; + private LogRecordWithDLSN nextRecord = null; + + // Failure Injector + private boolean disableProcessingReadRequests = false; + + // Stats + private final OpStatsLogger readNextExecTime; + private final OpStatsLogger delayUntilPromiseSatisfied; + private final OpStatsLogger timeBetweenReadNexts; + private final OpStatsLogger futureSetLatency; + private final OpStatsLogger scheduleLatency; + private final OpStatsLogger backgroundReaderRunTime; + private final Counter idleReaderCheckCount; + private final Counter idleReaderCheckIdleReadRequestCount; + private final Counter idleReaderCheckIdleReadAheadCount; + private final Counter idleReaderError; + + private class PendingReadRequest { + private final Stopwatch enqueueTime; + private final int numEntries; + private final List<LogRecordWithDLSN> records; + private final Promise<List<LogRecordWithDLSN>> promise; + private final long deadlineTime; + private final TimeUnit deadlineTimeUnit; + + PendingReadRequest(int numEntries, + long deadlineTime, + TimeUnit deadlineTimeUnit) { + this.numEntries = numEntries; + this.enqueueTime = Stopwatch.createStarted(); + // optimize the space usage for single read. + if (numEntries == 1) { + this.records = new ArrayList<LogRecordWithDLSN>(1); + } else { + this.records = new ArrayList<LogRecordWithDLSN>(); + } + this.promise = new Promise<List<LogRecordWithDLSN>>(); + this.deadlineTime = deadlineTime; + this.deadlineTimeUnit = deadlineTimeUnit; + } + + Promise<List<LogRecordWithDLSN>> getPromise() { + return promise; + } + + long elapsedSinceEnqueue(TimeUnit timeUnit) { + return enqueueTime.elapsed(timeUnit); + } + + void setException(Throwable throwable) { + Stopwatch stopwatch = Stopwatch.createStarted(); + if (promise.updateIfEmpty(new Throw<List<LogRecordWithDLSN>>(throwable))) { + futureSetLatency.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); + delayUntilPromiseSatisfied.registerFailedEvent(enqueueTime.elapsed(TimeUnit.MICROSECONDS)); + } + } + + boolean hasReadRecords() { + return records.size() > 0; + } + + boolean hasReadEnoughRecords() { + return records.size() >= numEntries; + } + + long getRemainingWaitTime() { + if (deadlineTime <= 0L) { + return 0L; + } + return deadlineTime - elapsedSinceEnqueue(deadlineTimeUnit); + } + + void addRecord(LogRecordWithDLSN record) { + records.add(record); + } + + void complete() { + if (LOG.isTraceEnabled()) { + LOG.trace("{} : Satisfied promise with {} records", readHandler.getFullyQualifiedName(), records.size()); + } + delayUntilPromiseSatisfied.registerSuccessfulEvent(enqueueTime.stop().elapsed(TimeUnit.MICROSECONDS)); + Stopwatch stopwatch = Stopwatch.createStarted(); + promise.setValue(records); + futureSetLatency.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); + } + } + + BKAsyncLogReader(BKDistributedLogManager bkdlm, + ScheduledExecutorService executorService, + DLSN startDLSN, + Optional<String> subscriberId, + boolean returnEndOfStreamRecord, + StatsLogger statsLogger) { + this.bkDistributedLogManager = bkdlm; + this.executorService = executorService; + this.readHandler = bkDistributedLogManager.createReadHandler(subscriberId, + this, true); + LOG.debug("Starting async reader at {}", startDLSN); + this.startDLSN = startDLSN; + this.scheduleDelayStopwatch = Stopwatch.createUnstarted(); + this.readNextDelayStopwatch = Stopwatch.createStarted(); + this.positionGapDetectionEnabled = bkdlm.getConf().getPositionGapDetectionEnabled(); + this.idleErrorThresholdMillis = bkdlm.getConf().getReaderIdleErrorThresholdMillis(); + this.returnEndOfStreamRecord = returnEndOfStreamRecord; + + // Stats + StatsLogger asyncReaderStatsLogger = statsLogger.scope("async_reader"); + futureSetLatency = asyncReaderStatsLogger.getOpStatsLogger("future_set"); + scheduleLatency = asyncReaderStatsLogger.getOpStatsLogger("schedule"); + backgroundReaderRunTime = asyncReaderStatsLogger.getOpStatsLogger("background_read"); + readNextExecTime = asyncReaderStatsLogger.getOpStatsLogger("read_next_exec"); + timeBetweenReadNexts = asyncReaderStatsLogger.getOpStatsLogger("time_between_read_next"); + delayUntilPromiseSatisfied = asyncReaderStatsLogger.getOpStatsLogger("delay_until_promise_satisfied"); + idleReaderError = asyncReaderStatsLogger.getCounter("idle_reader_error"); + idleReaderCheckCount = asyncReaderStatsLogger.getCounter("idle_reader_check_total"); + idleReaderCheckIdleReadRequestCount = asyncReaderStatsLogger.getCounter("idle_reader_check_idle_read_requests"); + idleReaderCheckIdleReadAheadCount = asyncReaderStatsLogger.getCounter("idle_reader_check_idle_readahead"); + + // Lock the stream if requested. The lock will be released when the reader is closed. + this.lockStream = false; + this.idleReaderTimeoutTask = scheduleIdleReaderTaskIfNecessary(); + this.lastProcessTime = Stopwatch.createStarted(); + } + + private ScheduledFuture<?> scheduleIdleReaderTaskIfNecessary() { + if (idleErrorThresholdMillis < Integer.MAX_VALUE) { + // Dont run the task more than once every seconds (for sanity) + long period = Math.max(idleErrorThresholdMillis / 10, 1000); + // Except when idle reader threshold is less than a second (tests?) + period = Math.min(period, idleErrorThresholdMillis / 5); + + return executorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + PendingReadRequest nextRequest = pendingRequests.peek(); + + idleReaderCheckCount.inc(); + if (null == nextRequest) { + return; + } + + idleReaderCheckIdleReadRequestCount.inc(); + if (nextRequest.elapsedSinceEnqueue(TimeUnit.MILLISECONDS) < idleErrorThresholdMillis) { + return; + } + + ReadAheadEntryReader readAheadReader = getReadAheadReader(); + + // read request has been idle + // - cache has records but read request are idle, + // that means notification was missed between readahead and reader. + // - cache is empty and readahead is idle (no records added for a long time) + idleReaderCheckIdleReadAheadCount.inc(); + try { + if (null == readAheadReader || (!hasMoreRecords() && + readAheadReader.isReaderIdle(idleErrorThresholdMillis, TimeUnit.MILLISECONDS))) { + markReaderAsIdle(); + return; + } else if (lastProcessTime.elapsed(TimeUnit.MILLISECONDS) > idleErrorThresholdMillis) { + markReaderAsIdle();; + } + } catch (IOException e) { + setLastException(e); + return; + } + } + }, period, period, TimeUnit.MILLISECONDS); + } + return null; + } + + synchronized ReadAheadEntryReader getReadAheadReader() { + return readAheadReader; + } + + void cancelIdleReaderTask() { + // Do this after we have checked that the reader was not previously closed + try { + if (null != idleReaderTimeoutTask) { + idleReaderTimeoutTask.cancel(true); + } + } catch (Exception exc) { + LOG.info("{}: Failed to cancel the background idle reader timeout task", readHandler.getFullyQualifiedName()); + } + } + + private void markReaderAsIdle() { + idleReaderError.inc(); + IdleReaderException ire = new IdleReaderException("Reader on stream " + + readHandler.getFullyQualifiedName() + + " is idle for " + idleErrorThresholdMillis +" ms"); + setLastException(ire); + // cancel all pending reads directly rather than notifying on error + // because idle reader could happen on idle read requests that usually means something wrong + // in scheduling reads + cancelAllPendingReads(ire); + } + + protected synchronized void setStartDLSN(DLSN fromDLSN) throws UnexpectedException { + if (null != readAheadReader) { + throw new UnexpectedException("Could't reset from dlsn after reader already starts reading."); + } + startDLSN = fromDLSN; + } + + @VisibleForTesting + public synchronized DLSN getStartDLSN() { + return startDLSN; + } + + public Future<Void> lockStream() { + this.lockStream = true; + return readHandler.lockStream(); + } + + private boolean checkClosedOrInError(String operation) { + if (null == lastException.get()) { + try { + if (null != readHandler && null != getReadAheadReader()) { + getReadAheadReader().checkLastException(); + } + + bkDistributedLogManager.checkClosedOrInError(operation); + } catch (IOException exc) { + setLastException(exc); + } + } + + if (lockStream) { + try { + readHandler.checkReadLock(); + } catch (IOException ex) { + setLastException(ex); + } + } + + if (null != lastException.get()) { + LOG.trace("Cancelling pending reads"); + cancelAllPendingReads(lastException.get()); + return true; + } + + return false; + } + + private void setLastException(IOException exc) { + lastException.compareAndSet(null, exc); + } + + @Override + public String getStreamName() { + return bkDistributedLogManager.getStreamName(); + } + + /** + * @return A promise that when satisfied will contain the Log Record with its DLSN. + */ + @Override + public synchronized Future<LogRecordWithDLSN> readNext() { + return readInternal(1, 0, TimeUnit.MILLISECONDS).map(READ_NEXT_MAP_FUNCTION); + } + + public synchronized Future<List<LogRecordWithDLSN>> readBulk(int numEntries) { + return readInternal(numEntries, 0, TimeUnit.MILLISECONDS); + } + + @Override + public synchronized Future<List<LogRecordWithDLSN>> readBulk(int numEntries, + long waitTime, + TimeUnit timeUnit) { + return readInternal(numEntries, waitTime, timeUnit); + } + + /** + * Read up to <i>numEntries</i> entries. The future will be satisfied when any number of entries are + * ready (1 to <i>numEntries</i>). + * + * @param numEntries + * num entries to read + * @return A promise that satisfied with a non-empty list of log records with their DLSN. + */ + private synchronized Future<List<LogRecordWithDLSN>> readInternal(int numEntries, + long deadlineTime, + TimeUnit deadlineTimeUnit) { + timeBetweenReadNexts.registerSuccessfulEvent(readNextDelayStopwatch.elapsed(TimeUnit.MICROSECONDS)); + readNextDelayStopwatch.reset().start(); + final PendingReadRequest readRequest = new PendingReadRequest(numEntries, deadlineTime, deadlineTimeUnit); + + if (null == readAheadReader) { + try { + final ReadAheadEntryReader readAheadEntryReader = this.readAheadReader = new ReadAheadEntryReader( + getStreamName(), + getStartDLSN(), + bkDistributedLogManager.getConf(), + readHandler, + bkDistributedLogManager.getReaderEntryStore(), + bkDistributedLogManager.getScheduler(), + Ticker.systemTicker(), + bkDistributedLogManager.alertStatsLogger); + readHandler.checkLogStreamExists().addEventListener(new FutureEventListener<Void>() { + @Override + public void onSuccess(Void value) { + try { + readHandler.registerListener(readAheadEntryReader); + readHandler.asyncStartFetchLogSegments() + .map(new AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() { + @Override + public BoxedUnit apply(Versioned<List<LogSegmentMetadata>> logSegments) { + readAheadEntryReader.addStateChangeNotification(BKAsyncLogReader.this); + readAheadEntryReader.start(logSegments.getValue()); + return BoxedUnit.UNIT; + } + }); + } catch (Exception exc) { + notifyOnError(exc); + } + } + + @Override + public void onFailure(Throwable cause) { + notifyOnError(cause); + } + }); + } catch (IOException ioe) { + notifyOnError(ioe); + } + } + + if (checkClosedOrInError("readNext")) { + readRequest.setException(lastException.get()); + } else { + boolean queueEmpty = pendingRequests.isEmpty(); + pendingRequests.add(readRequest); + + if (queueEmpty) { + scheduleBackgroundRead(); + } + } + + readNextExecTime.registerSuccessfulEvent(readNextDelayStopwatch.elapsed(TimeUnit.MICROSECONDS)); + readNextDelayStopwatch.reset().start(); + + return readRequest.getPromise(); + } + + public synchronized void scheduleBackgroundRead() { + // if the reader is already closed, we don't need to schedule background read again. + if (null != closeFuture) { + return; + } + + long prevCount = scheduleCount.getAndIncrement(); + if (0 == prevCount) { + scheduleDelayStopwatch.reset().start(); + executorService.submit(this); + } + } + + @Override + public Future<Void> asyncClose() { + // Cancel the idle reader timeout task, interrupting if necessary + ReadCancelledException exception; + Promise<Void> closePromise; + synchronized (this) { + if (null != closeFuture) { + return closeFuture; + } + closePromise = closeFuture = new Promise<Void>(); + exception = new ReadCancelledException(readHandler.getFullyQualifiedName(), "Reader was closed"); + setLastException(exception); + } + + // Do this after we have checked that the reader was not previously closed + cancelIdleReaderTask(); + + synchronized (scheduleLock) { + if (null != backgroundScheduleTask) { + backgroundScheduleTask.cancel(true); + } + } + + cancelAllPendingReads(exception); + + ReadAheadEntryReader readAheadReader = getReadAheadReader(); + if (null != readAheadReader) { + readHandler.unregisterListener(readAheadReader); + readAheadReader.removeStateChangeNotification(this); + } + Utils.closeSequence(bkDistributedLogManager.getScheduler(), true, + readAheadReader, + readHandler + ).proxyTo(closePromise); + return closePromise; + } + + private void cancelAllPendingReads(Throwable throwExc) { + for (PendingReadRequest promise : pendingRequests) { + promise.setException(throwExc); + } + pendingRequests.clear(); + } + + synchronized boolean hasMoreRecords() throws IOException { + if (null == readAheadReader) { + return false; + } + if (readAheadReader.getNumCachedEntries() > 0 || null != nextRecord) { + return true; + } else if (null != currentEntry) { + nextRecord = currentEntry.nextRecord(); + return null != nextRecord; + } + return false; + } + + private synchronized LogRecordWithDLSN readNextRecord() throws IOException { + if (null == readAheadReader) { + return null; + } + if (null == currentEntry) { + currentEntry = readAheadReader.getNextReadAheadEntry(0L, TimeUnit.MILLISECONDS); + // no entry after reading from read ahead then return null + if (null == currentEntry) { + return null; + } + } + + LogRecordWithDLSN recordToReturn; + if (null == nextRecord) { + nextRecord = currentEntry.nextRecord(); + // no more records in current entry + if (null == nextRecord) { + currentEntry = null; + return readNextRecord(); + } + } + + // found a record to return and prefetch the next one + recordToReturn = nextRecord; + nextRecord = currentEntry.nextRecord(); + return recordToReturn; + } + + @Override + public void run() { + synchronized(scheduleLock) { + if (scheduleDelayStopwatch.isRunning()) { + scheduleLatency.registerSuccessfulEvent(scheduleDelayStopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); + } + + Stopwatch runTime = Stopwatch.createStarted(); + int iterations = 0; + long scheduleCountLocal = scheduleCount.get(); + LOG.debug("{}: Scheduled Background Reader", readHandler.getFullyQualifiedName()); + while(true) { + if (LOG.isTraceEnabled()) { + LOG.trace("{}: Executing Iteration: {}", readHandler.getFullyQualifiedName(), iterations++); + } + + PendingReadRequest nextRequest = null; + synchronized(this) { + nextRequest = pendingRequests.peek(); + + // Queue is empty, nothing to read, return + if (null == nextRequest) { + LOG.trace("{}: Queue Empty waiting for Input", readHandler.getFullyQualifiedName()); + scheduleCount.set(0); + backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS)); + return; + } + + if (disableProcessingReadRequests) { + LOG.info("Reader of {} is forced to stop processing read requests", readHandler.getFullyQualifiedName()); + return; + } + } + lastProcessTime.reset().start(); + + // If the oldest pending promise is interrupted then we must mark + // the reader in error and abort all pending reads since we dont + // know the last consumed read + if (null == lastException.get()) { + if (nextRequest.getPromise().isInterrupted().isDefined()) { + setLastException(new DLInterruptedException("Interrupted on reading " + readHandler.getFullyQualifiedName() + " : ", + nextRequest.getPromise().isInterrupted().get())); + } + } + + if (checkClosedOrInError("readNext")) { + if (!(lastException.get().getCause() instanceof LogNotFoundException)) { + LOG.warn("{}: Exception", readHandler.getFullyQualifiedName(), lastException.get()); + } + backgroundReaderRunTime.registerFailedEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS)); + return; + } + + try { + // Fail 10% of the requests when asked to simulate errors + if (bkDistributedLogManager.getFailureInjector().shouldInjectErrors()) { + throw new IOException("Reader Simulated Exception"); + } + LogRecordWithDLSN record; + while (!nextRequest.hasReadEnoughRecords()) { + // read single record + do { + record = readNextRecord(); + } while (null != record && (record.isControl() || (record.getDlsn().compareTo(getStartDLSN()) < 0))); + if (null == record) { + break; + } else { + if (record.isEndOfStream() && !returnEndOfStreamRecord) { + setLastException(new EndOfStreamException("End of Stream Reached for " + + readHandler.getFullyQualifiedName())); + break; + } + + // gap detection + if (recordPositionsContainsGap(record, lastPosition)) { + bkDistributedLogManager.raiseAlert("Gap detected between records at record = {}", record); + if (positionGapDetectionEnabled) { + throw new DLIllegalStateException("Gap detected between records at record = " + record); + } + } + lastPosition = record.getLastPositionWithinLogSegment(); + + nextRequest.addRecord(record); + } + }; + } catch (IOException exc) { + setLastException(exc); + if (!(exc instanceof LogNotFoundException)) { + LOG.warn("{} : read with skip Exception", readHandler.getFullyQualifiedName(), lastException.get()); + } + continue; + } + + if (nextRequest.hasReadRecords()) { + long remainingWaitTime = nextRequest.getRemainingWaitTime(); + if (remainingWaitTime > 0 && !nextRequest.hasReadEnoughRecords()) { + backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS)); + scheduleDelayStopwatch.reset().start(); + scheduleCount.set(0); + // the request could still wait for more records + backgroundScheduleTask = executorService.schedule(BACKGROUND_READ_SCHEDULER, remainingWaitTime, nextRequest.deadlineTimeUnit); + return; + } + + PendingReadRequest request = pendingRequests.poll(); + if (null != request && nextRequest == request) { + request.complete(); + if (null != backgroundScheduleTask) { + backgroundScheduleTask.cancel(true); + backgroundScheduleTask = null; + } + } else { + DLIllegalStateException ise = new DLIllegalStateException("Unexpected condition at dlsn = " + + nextRequest.records.get(0).getDlsn()); + nextRequest.setException(ise); + if (null != request) { + request.setException(ise); + } + // We should never get here as we should have exited the loop if + // pendingRequests were empty + bkDistributedLogManager.raiseAlert("Unexpected condition at dlsn = {}", + nextRequest.records.get(0).getDlsn()); + setLastException(ise); + } + } else { + if (0 == scheduleCountLocal) { + LOG.trace("Schedule count dropping to zero", lastException.get()); + backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS)); + return; + } + scheduleCountLocal = scheduleCount.decrementAndGet(); + } + } + } + } + + private boolean recordPositionsContainsGap(LogRecordWithDLSN record, long lastPosition) { + final boolean firstLogRecord = (1 == record.getPositionWithinLogSegment()); + final boolean endOfStreamRecord = record.isEndOfStream(); + final boolean emptyLogSegment = (0 == lastPosition); + final boolean positionIncreasedByOne = (record.getPositionWithinLogSegment() == (lastPosition + 1)); + + return !firstLogRecord && !endOfStreamRecord && !emptyLogSegment && + !positionIncreasedByOne; + } + + /** + * Triggered when the background activity encounters an exception + */ + @Override + public void notifyOnError(Throwable cause) { + if (cause instanceof IOException) { + setLastException((IOException) cause); + } else { + setLastException(new IOException(cause)); + } + scheduleBackgroundRead(); + } + + /** + * Triggered when the background activity completes an operation + */ + @Override + public void notifyOnOperationComplete() { + scheduleBackgroundRead(); + } + + @VisibleForTesting + void simulateErrors() { + bkDistributedLogManager.getFailureInjector().injectErrors(true); + } + + @VisibleForTesting + synchronized void disableReadAheadLogSegmentsNotification() { + readHandler.disableReadAheadLogSegmentsNotification(); + } + + @VisibleForTesting + synchronized void disableProcessingReadRequests() { + disableProcessingReadRequests = true; + } +} + http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java deleted file mode 100644 index e347012..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java +++ /dev/null @@ -1,748 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.base.Stopwatch; -import com.google.common.base.Ticker; -import com.twitter.distributedlog.exceptions.DLIllegalStateException; -import com.twitter.distributedlog.exceptions.DLInterruptedException; -import com.twitter.distributedlog.exceptions.EndOfStreamException; -import com.twitter.distributedlog.exceptions.IdleReaderException; -import com.twitter.distributedlog.exceptions.LogNotFoundException; -import com.twitter.distributedlog.exceptions.ReadCancelledException; -import com.twitter.distributedlog.exceptions.UnexpectedException; -import com.twitter.distributedlog.util.Utils; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; -import com.twitter.util.Throw; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.bookkeeper.versioning.Versioned; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.Function1; -import scala.runtime.AbstractFunction1; -import scala.runtime.BoxedUnit; - -/** - * BookKeeper based {@link AsyncLogReader} implementation. - * - * <h3>Metrics</h3> - * All the metrics are exposed under `async_reader`. - * <ul> - * <li> `async_reader`/future_set: opstats. time spent on satisfying futures of read requests. - * if it is high, it means that the caller takes time on processing the result of read requests. - * The side effect is blocking consequent reads. - * <li> `async_reader`/schedule: opstats. time spent on scheduling next reads. - * <li> `async_reader`/background_read: opstats. time spent on background reads. - * <li> `async_reader`/read_next_exec: opstats. time spent on executing {@link #readNext()}. - * <li> `async_reader`/time_between_read_next: opstats. time spent on between two consequent {@link #readNext()}. - * if it is high, it means that the caller is slowing down on calling {@link #readNext()}. - * <li> `async_reader`/delay_until_promise_satisfied: opstats. total latency for the read requests. - * <li> `async_reader`/idle_reader_error: counter. the number idle reader errors. - * </ul> - */ -class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotification { - static final Logger LOG = LoggerFactory.getLogger(BKAsyncLogReaderDLSN.class); - - private static final Function1<List<LogRecordWithDLSN>, LogRecordWithDLSN> READ_NEXT_MAP_FUNCTION = - new AbstractFunction1<List<LogRecordWithDLSN>, LogRecordWithDLSN>() { - @Override - public LogRecordWithDLSN apply(List<LogRecordWithDLSN> records) { - return records.get(0); - } - }; - - protected final BKDistributedLogManager bkDistributedLogManager; - protected final BKLogReadHandler readHandler; - private final AtomicReference<Throwable> lastException = new AtomicReference<Throwable>(); - private final ScheduledExecutorService executorService; - private final ConcurrentLinkedQueue<PendingReadRequest> pendingRequests = new ConcurrentLinkedQueue<PendingReadRequest>(); - private final Object scheduleLock = new Object(); - private final AtomicLong scheduleCount = new AtomicLong(0); - final private Stopwatch scheduleDelayStopwatch; - final private Stopwatch readNextDelayStopwatch; - private DLSN startDLSN; - private ReadAheadEntryReader readAheadReader = null; - private int lastPosition = 0; - private final boolean positionGapDetectionEnabled; - private final int idleErrorThresholdMillis; - final ScheduledFuture<?> idleReaderTimeoutTask; - private ScheduledFuture<?> backgroundScheduleTask = null; - // last process time - private final Stopwatch lastProcessTime; - - protected Promise<Void> closeFuture = null; - - private boolean lockStream = false; - - private final boolean returnEndOfStreamRecord; - - private final Runnable BACKGROUND_READ_SCHEDULER = new Runnable() { - @Override - public void run() { - synchronized (scheduleLock) { - backgroundScheduleTask = null; - } - scheduleBackgroundRead(); - } - }; - - // State - private Entry.Reader currentEntry = null; - private LogRecordWithDLSN nextRecord = null; - - // Failure Injector - private boolean disableProcessingReadRequests = false; - - // Stats - private final OpStatsLogger readNextExecTime; - private final OpStatsLogger delayUntilPromiseSatisfied; - private final OpStatsLogger timeBetweenReadNexts; - private final OpStatsLogger futureSetLatency; - private final OpStatsLogger scheduleLatency; - private final OpStatsLogger backgroundReaderRunTime; - private final Counter idleReaderCheckCount; - private final Counter idleReaderCheckIdleReadRequestCount; - private final Counter idleReaderCheckIdleReadAheadCount; - private final Counter idleReaderError; - - private class PendingReadRequest { - private final Stopwatch enqueueTime; - private final int numEntries; - private final List<LogRecordWithDLSN> records; - private final Promise<List<LogRecordWithDLSN>> promise; - private final long deadlineTime; - private final TimeUnit deadlineTimeUnit; - - PendingReadRequest(int numEntries, - long deadlineTime, - TimeUnit deadlineTimeUnit) { - this.numEntries = numEntries; - this.enqueueTime = Stopwatch.createStarted(); - // optimize the space usage for single read. - if (numEntries == 1) { - this.records = new ArrayList<LogRecordWithDLSN>(1); - } else { - this.records = new ArrayList<LogRecordWithDLSN>(); - } - this.promise = new Promise<List<LogRecordWithDLSN>>(); - this.deadlineTime = deadlineTime; - this.deadlineTimeUnit = deadlineTimeUnit; - } - - Promise<List<LogRecordWithDLSN>> getPromise() { - return promise; - } - - long elapsedSinceEnqueue(TimeUnit timeUnit) { - return enqueueTime.elapsed(timeUnit); - } - - void setException(Throwable throwable) { - Stopwatch stopwatch = Stopwatch.createStarted(); - if (promise.updateIfEmpty(new Throw<List<LogRecordWithDLSN>>(throwable))) { - futureSetLatency.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); - delayUntilPromiseSatisfied.registerFailedEvent(enqueueTime.elapsed(TimeUnit.MICROSECONDS)); - } - } - - boolean hasReadRecords() { - return records.size() > 0; - } - - boolean hasReadEnoughRecords() { - return records.size() >= numEntries; - } - - long getRemainingWaitTime() { - if (deadlineTime <= 0L) { - return 0L; - } - return deadlineTime - elapsedSinceEnqueue(deadlineTimeUnit); - } - - void addRecord(LogRecordWithDLSN record) { - records.add(record); - } - - void complete() { - if (LOG.isTraceEnabled()) { - LOG.trace("{} : Satisfied promise with {} records", readHandler.getFullyQualifiedName(), records.size()); - } - delayUntilPromiseSatisfied.registerSuccessfulEvent(enqueueTime.stop().elapsed(TimeUnit.MICROSECONDS)); - Stopwatch stopwatch = Stopwatch.createStarted(); - promise.setValue(records); - futureSetLatency.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); - } - } - - BKAsyncLogReaderDLSN(BKDistributedLogManager bkdlm, - ScheduledExecutorService executorService, - DLSN startDLSN, - Optional<String> subscriberId, - boolean returnEndOfStreamRecord, - StatsLogger statsLogger) { - this.bkDistributedLogManager = bkdlm; - this.executorService = executorService; - this.readHandler = bkDistributedLogManager.createReadHandler(subscriberId, - this, true); - LOG.debug("Starting async reader at {}", startDLSN); - this.startDLSN = startDLSN; - this.scheduleDelayStopwatch = Stopwatch.createUnstarted(); - this.readNextDelayStopwatch = Stopwatch.createStarted(); - this.positionGapDetectionEnabled = bkdlm.getConf().getPositionGapDetectionEnabled(); - this.idleErrorThresholdMillis = bkdlm.getConf().getReaderIdleErrorThresholdMillis(); - this.returnEndOfStreamRecord = returnEndOfStreamRecord; - - // Stats - StatsLogger asyncReaderStatsLogger = statsLogger.scope("async_reader"); - futureSetLatency = asyncReaderStatsLogger.getOpStatsLogger("future_set"); - scheduleLatency = asyncReaderStatsLogger.getOpStatsLogger("schedule"); - backgroundReaderRunTime = asyncReaderStatsLogger.getOpStatsLogger("background_read"); - readNextExecTime = asyncReaderStatsLogger.getOpStatsLogger("read_next_exec"); - timeBetweenReadNexts = asyncReaderStatsLogger.getOpStatsLogger("time_between_read_next"); - delayUntilPromiseSatisfied = asyncReaderStatsLogger.getOpStatsLogger("delay_until_promise_satisfied"); - idleReaderError = asyncReaderStatsLogger.getCounter("idle_reader_error"); - idleReaderCheckCount = asyncReaderStatsLogger.getCounter("idle_reader_check_total"); - idleReaderCheckIdleReadRequestCount = asyncReaderStatsLogger.getCounter("idle_reader_check_idle_read_requests"); - idleReaderCheckIdleReadAheadCount = asyncReaderStatsLogger.getCounter("idle_reader_check_idle_readahead"); - - // Lock the stream if requested. The lock will be released when the reader is closed. - this.lockStream = false; - this.idleReaderTimeoutTask = scheduleIdleReaderTaskIfNecessary(); - this.lastProcessTime = Stopwatch.createStarted(); - } - - private ScheduledFuture<?> scheduleIdleReaderTaskIfNecessary() { - if (idleErrorThresholdMillis < Integer.MAX_VALUE) { - // Dont run the task more than once every seconds (for sanity) - long period = Math.max(idleErrorThresholdMillis / 10, 1000); - // Except when idle reader threshold is less than a second (tests?) - period = Math.min(period, idleErrorThresholdMillis / 5); - - return executorService.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - PendingReadRequest nextRequest = pendingRequests.peek(); - - idleReaderCheckCount.inc(); - if (null == nextRequest) { - return; - } - - idleReaderCheckIdleReadRequestCount.inc(); - if (nextRequest.elapsedSinceEnqueue(TimeUnit.MILLISECONDS) < idleErrorThresholdMillis) { - return; - } - - ReadAheadEntryReader readAheadReader = getReadAheadReader(); - - // read request has been idle - // - cache has records but read request are idle, - // that means notification was missed between readahead and reader. - // - cache is empty and readahead is idle (no records added for a long time) - idleReaderCheckIdleReadAheadCount.inc(); - try { - if (null == readAheadReader || (!hasMoreRecords() && - readAheadReader.isReaderIdle(idleErrorThresholdMillis, TimeUnit.MILLISECONDS))) { - markReaderAsIdle(); - return; - } else if (lastProcessTime.elapsed(TimeUnit.MILLISECONDS) > idleErrorThresholdMillis) { - markReaderAsIdle();; - } - } catch (IOException e) { - setLastException(e); - return; - } - } - }, period, period, TimeUnit.MILLISECONDS); - } - return null; - } - - synchronized ReadAheadEntryReader getReadAheadReader() { - return readAheadReader; - } - - void cancelIdleReaderTask() { - // Do this after we have checked that the reader was not previously closed - try { - if (null != idleReaderTimeoutTask) { - idleReaderTimeoutTask.cancel(true); - } - } catch (Exception exc) { - LOG.info("{}: Failed to cancel the background idle reader timeout task", readHandler.getFullyQualifiedName()); - } - } - - private void markReaderAsIdle() { - idleReaderError.inc(); - IdleReaderException ire = new IdleReaderException("Reader on stream " - + readHandler.getFullyQualifiedName() - + " is idle for " + idleErrorThresholdMillis +" ms"); - setLastException(ire); - // cancel all pending reads directly rather than notifying on error - // because idle reader could happen on idle read requests that usually means something wrong - // in scheduling reads - cancelAllPendingReads(ire); - } - - protected synchronized void setStartDLSN(DLSN fromDLSN) throws UnexpectedException { - if (null != readAheadReader) { - throw new UnexpectedException("Could't reset from dlsn after reader already starts reading."); - } - startDLSN = fromDLSN; - } - - @VisibleForTesting - public synchronized DLSN getStartDLSN() { - return startDLSN; - } - - public Future<Void> lockStream() { - this.lockStream = true; - return readHandler.lockStream(); - } - - private boolean checkClosedOrInError(String operation) { - if (null == lastException.get()) { - try { - if (null != readHandler && null != getReadAheadReader()) { - getReadAheadReader().checkLastException(); - } - - bkDistributedLogManager.checkClosedOrInError(operation); - } catch (IOException exc) { - setLastException(exc); - } - } - - if (lockStream) { - try { - readHandler.checkReadLock(); - } catch (IOException ex) { - setLastException(ex); - } - } - - if (null != lastException.get()) { - LOG.trace("Cancelling pending reads"); - cancelAllPendingReads(lastException.get()); - return true; - } - - return false; - } - - private void setLastException(IOException exc) { - lastException.compareAndSet(null, exc); - } - - @Override - public String getStreamName() { - return bkDistributedLogManager.getStreamName(); - } - - /** - * @return A promise that when satisfied will contain the Log Record with its DLSN. - */ - @Override - public synchronized Future<LogRecordWithDLSN> readNext() { - return readInternal(1, 0, TimeUnit.MILLISECONDS).map(READ_NEXT_MAP_FUNCTION); - } - - public synchronized Future<List<LogRecordWithDLSN>> readBulk(int numEntries) { - return readInternal(numEntries, 0, TimeUnit.MILLISECONDS); - } - - @Override - public synchronized Future<List<LogRecordWithDLSN>> readBulk(int numEntries, - long waitTime, - TimeUnit timeUnit) { - return readInternal(numEntries, waitTime, timeUnit); - } - - /** - * Read up to <i>numEntries</i> entries. The future will be satisfied when any number of entries are - * ready (1 to <i>numEntries</i>). - * - * @param numEntries - * num entries to read - * @return A promise that satisfied with a non-empty list of log records with their DLSN. - */ - private synchronized Future<List<LogRecordWithDLSN>> readInternal(int numEntries, - long deadlineTime, - TimeUnit deadlineTimeUnit) { - timeBetweenReadNexts.registerSuccessfulEvent(readNextDelayStopwatch.elapsed(TimeUnit.MICROSECONDS)); - readNextDelayStopwatch.reset().start(); - final PendingReadRequest readRequest = new PendingReadRequest(numEntries, deadlineTime, deadlineTimeUnit); - - if (null == readAheadReader) { - try { - final ReadAheadEntryReader readAheadEntryReader = this.readAheadReader = new ReadAheadEntryReader( - getStreamName(), - getStartDLSN(), - bkDistributedLogManager.getConf(), - readHandler, - bkDistributedLogManager.getReaderEntryStore(), - bkDistributedLogManager.getScheduler(), - Ticker.systemTicker(), - bkDistributedLogManager.alertStatsLogger); - readHandler.checkLogStreamExists().addEventListener(new FutureEventListener<Void>() { - @Override - public void onSuccess(Void value) { - try { - readHandler.registerListener(readAheadEntryReader); - readHandler.asyncStartFetchLogSegments() - .map(new AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() { - @Override - public BoxedUnit apply(Versioned<List<LogSegmentMetadata>> logSegments) { - readAheadEntryReader.addStateChangeNotification(BKAsyncLogReaderDLSN.this); - readAheadEntryReader.start(logSegments.getValue()); - return BoxedUnit.UNIT; - } - }); - } catch (Exception exc) { - notifyOnError(exc); - } - } - - @Override - public void onFailure(Throwable cause) { - notifyOnError(cause); - } - }); - } catch (IOException ioe) { - notifyOnError(ioe); - } - } - - if (checkClosedOrInError("readNext")) { - readRequest.setException(lastException.get()); - } else { - boolean queueEmpty = pendingRequests.isEmpty(); - pendingRequests.add(readRequest); - - if (queueEmpty) { - scheduleBackgroundRead(); - } - } - - readNextExecTime.registerSuccessfulEvent(readNextDelayStopwatch.elapsed(TimeUnit.MICROSECONDS)); - readNextDelayStopwatch.reset().start(); - - return readRequest.getPromise(); - } - - public synchronized void scheduleBackgroundRead() { - // if the reader is already closed, we don't need to schedule background read again. - if (null != closeFuture) { - return; - } - - long prevCount = scheduleCount.getAndIncrement(); - if (0 == prevCount) { - scheduleDelayStopwatch.reset().start(); - executorService.submit(this); - } - } - - @Override - public Future<Void> asyncClose() { - // Cancel the idle reader timeout task, interrupting if necessary - ReadCancelledException exception; - Promise<Void> closePromise; - synchronized (this) { - if (null != closeFuture) { - return closeFuture; - } - closePromise = closeFuture = new Promise<Void>(); - exception = new ReadCancelledException(readHandler.getFullyQualifiedName(), "Reader was closed"); - setLastException(exception); - } - - // Do this after we have checked that the reader was not previously closed - cancelIdleReaderTask(); - - synchronized (scheduleLock) { - if (null != backgroundScheduleTask) { - backgroundScheduleTask.cancel(true); - } - } - - cancelAllPendingReads(exception); - - ReadAheadEntryReader readAheadReader = getReadAheadReader(); - if (null != readAheadReader) { - readHandler.unregisterListener(readAheadReader); - readAheadReader.removeStateChangeNotification(this); - } - Utils.closeSequence(bkDistributedLogManager.getScheduler(), true, - readAheadReader, - readHandler - ).proxyTo(closePromise); - return closePromise; - } - - private void cancelAllPendingReads(Throwable throwExc) { - for (PendingReadRequest promise : pendingRequests) { - promise.setException(throwExc); - } - pendingRequests.clear(); - } - - synchronized boolean hasMoreRecords() throws IOException { - if (null == readAheadReader) { - return false; - } - if (readAheadReader.getNumCachedEntries() > 0 || null != nextRecord) { - return true; - } else if (null != currentEntry) { - nextRecord = currentEntry.nextRecord(); - return null != nextRecord; - } - return false; - } - - private synchronized LogRecordWithDLSN readNextRecord() throws IOException { - if (null == readAheadReader) { - return null; - } - if (null == currentEntry) { - currentEntry = readAheadReader.getNextReadAheadEntry(0L, TimeUnit.MILLISECONDS); - // no entry after reading from read ahead then return null - if (null == currentEntry) { - return null; - } - } - - LogRecordWithDLSN recordToReturn; - if (null == nextRecord) { - nextRecord = currentEntry.nextRecord(); - // no more records in current entry - if (null == nextRecord) { - currentEntry = null; - return readNextRecord(); - } - } - - // found a record to return and prefetch the next one - recordToReturn = nextRecord; - nextRecord = currentEntry.nextRecord(); - return recordToReturn; - } - - @Override - public void run() { - synchronized(scheduleLock) { - if (scheduleDelayStopwatch.isRunning()) { - scheduleLatency.registerSuccessfulEvent(scheduleDelayStopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); - } - - Stopwatch runTime = Stopwatch.createStarted(); - int iterations = 0; - long scheduleCountLocal = scheduleCount.get(); - LOG.debug("{}: Scheduled Background Reader", readHandler.getFullyQualifiedName()); - while(true) { - if (LOG.isTraceEnabled()) { - LOG.trace("{}: Executing Iteration: {}", readHandler.getFullyQualifiedName(), iterations++); - } - - PendingReadRequest nextRequest = null; - synchronized(this) { - nextRequest = pendingRequests.peek(); - - // Queue is empty, nothing to read, return - if (null == nextRequest) { - LOG.trace("{}: Queue Empty waiting for Input", readHandler.getFullyQualifiedName()); - scheduleCount.set(0); - backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS)); - return; - } - - if (disableProcessingReadRequests) { - LOG.info("Reader of {} is forced to stop processing read requests", readHandler.getFullyQualifiedName()); - return; - } - } - lastProcessTime.reset().start(); - - // If the oldest pending promise is interrupted then we must mark - // the reader in error and abort all pending reads since we dont - // know the last consumed read - if (null == lastException.get()) { - if (nextRequest.getPromise().isInterrupted().isDefined()) { - setLastException(new DLInterruptedException("Interrupted on reading " + readHandler.getFullyQualifiedName() + " : ", - nextRequest.getPromise().isInterrupted().get())); - } - } - - if (checkClosedOrInError("readNext")) { - if (!(lastException.get().getCause() instanceof LogNotFoundException)) { - LOG.warn("{}: Exception", readHandler.getFullyQualifiedName(), lastException.get()); - } - backgroundReaderRunTime.registerFailedEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS)); - return; - } - - try { - // Fail 10% of the requests when asked to simulate errors - if (bkDistributedLogManager.getFailureInjector().shouldInjectErrors()) { - throw new IOException("Reader Simulated Exception"); - } - LogRecordWithDLSN record; - while (!nextRequest.hasReadEnoughRecords()) { - // read single record - do { - record = readNextRecord(); - } while (null != record && (record.isControl() || (record.getDlsn().compareTo(getStartDLSN()) < 0))); - if (null == record) { - break; - } else { - if (record.isEndOfStream() && !returnEndOfStreamRecord) { - setLastException(new EndOfStreamException("End of Stream Reached for " - + readHandler.getFullyQualifiedName())); - break; - } - - // gap detection - if (recordPositionsContainsGap(record, lastPosition)) { - bkDistributedLogManager.raiseAlert("Gap detected between records at record = {}", record); - if (positionGapDetectionEnabled) { - throw new DLIllegalStateException("Gap detected between records at record = " + record); - } - } - lastPosition = record.getLastPositionWithinLogSegment(); - - nextRequest.addRecord(record); - } - }; - } catch (IOException exc) { - setLastException(exc); - if (!(exc instanceof LogNotFoundException)) { - LOG.warn("{} : read with skip Exception", readHandler.getFullyQualifiedName(), lastException.get()); - } - continue; - } - - if (nextRequest.hasReadRecords()) { - long remainingWaitTime = nextRequest.getRemainingWaitTime(); - if (remainingWaitTime > 0 && !nextRequest.hasReadEnoughRecords()) { - backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS)); - scheduleDelayStopwatch.reset().start(); - scheduleCount.set(0); - // the request could still wait for more records - backgroundScheduleTask = executorService.schedule(BACKGROUND_READ_SCHEDULER, remainingWaitTime, nextRequest.deadlineTimeUnit); - return; - } - - PendingReadRequest request = pendingRequests.poll(); - if (null != request && nextRequest == request) { - request.complete(); - if (null != backgroundScheduleTask) { - backgroundScheduleTask.cancel(true); - backgroundScheduleTask = null; - } - } else { - DLIllegalStateException ise = new DLIllegalStateException("Unexpected condition at dlsn = " - + nextRequest.records.get(0).getDlsn()); - nextRequest.setException(ise); - if (null != request) { - request.setException(ise); - } - // We should never get here as we should have exited the loop if - // pendingRequests were empty - bkDistributedLogManager.raiseAlert("Unexpected condition at dlsn = {}", - nextRequest.records.get(0).getDlsn()); - setLastException(ise); - } - } else { - if (0 == scheduleCountLocal) { - LOG.trace("Schedule count dropping to zero", lastException.get()); - backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS)); - return; - } - scheduleCountLocal = scheduleCount.decrementAndGet(); - } - } - } - } - - private boolean recordPositionsContainsGap(LogRecordWithDLSN record, long lastPosition) { - final boolean firstLogRecord = (1 == record.getPositionWithinLogSegment()); - final boolean endOfStreamRecord = record.isEndOfStream(); - final boolean emptyLogSegment = (0 == lastPosition); - final boolean positionIncreasedByOne = (record.getPositionWithinLogSegment() == (lastPosition + 1)); - - return !firstLogRecord && !endOfStreamRecord && !emptyLogSegment && - !positionIncreasedByOne; - } - - /** - * Triggered when the background activity encounters an exception - */ - @Override - public void notifyOnError(Throwable cause) { - if (cause instanceof IOException) { - setLastException((IOException) cause); - } else { - setLastException(new IOException(cause)); - } - scheduleBackgroundRead(); - } - - /** - * Triggered when the background activity completes an operation - */ - @Override - public void notifyOnOperationComplete() { - scheduleBackgroundRead(); - } - - @VisibleForTesting - void simulateErrors() { - bkDistributedLogManager.getFailureInjector().injectErrors(true); - } - - @VisibleForTesting - synchronized void disableReadAheadLogSegmentsNotification() { - readHandler.disableReadAheadLogSegmentsNotification(); - } - - @VisibleForTesting - synchronized void disableProcessingReadRequests() { - disableProcessingReadRequests = true; - } -} - http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java index 4963787..219c0cf 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java @@ -98,7 +98,7 @@ import java.util.concurrent.TimeUnit; * <li> `log_writer/*`: all asynchronous writer related metrics are exposed under scope `log_writer`. * See {@link BKAsyncLogWriter} for detail stats. * <li> `async_reader/*`: all asyncrhonous reader related metrics are exposed under scope `async_reader`. - * See {@link BKAsyncLogReaderDLSN} for detail stats. + * See {@link BKAsyncLogReader} for detail stats. * <li> `writer_future_pool/*`: metrics about the future pools that used by writers are exposed under * scope `writer_future_pool`. See {@link MonitoredFuturePool} for detail stats. * <li> `reader_future_pool/*`: metrics about the future pools that used by readers are exposed under @@ -955,7 +955,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL @Override public Future<AsyncLogReader> openAsyncLogReader(DLSN fromDLSN) { Optional<String> subscriberId = Optional.absent(); - AsyncLogReader reader = new BKAsyncLogReaderDLSN( + AsyncLogReader reader = new BKAsyncLogReader( this, scheduler, fromDLSN, @@ -993,7 +993,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL if (!fromDLSN.isPresent() && !subscriberId.isPresent()) { return Future.exception(new UnexpectedException("Neither from dlsn nor subscriber id is provided.")); } - final BKAsyncLogReaderDLSN reader = new BKAsyncLogReaderDLSN( + final BKAsyncLogReader reader = new BKAsyncLogReader( BKDistributedLogManager.this, scheduler, fromDLSN.isPresent() ? fromDLSN.get() : DLSN.InitialDLSN, @@ -1077,7 +1077,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL throws IOException { LOG.info("Create sync reader starting from {}", fromDLSN); checkClosedOrInError("getInputStream"); - return new BKSyncLogReaderDLSN( + return new BKSyncLogReader( conf, this, fromDLSN, http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReader.java new file mode 100644 index 0000000..308f42a --- /dev/null +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReader.java @@ -0,0 +1,276 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Ticker; +import com.twitter.distributedlog.exceptions.EndOfStreamException; +import com.twitter.distributedlog.exceptions.IdleReaderException; +import com.twitter.distributedlog.util.FutureUtils; +import com.twitter.distributedlog.util.Utils; +import com.twitter.util.Future; +import com.twitter.util.Promise; +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.versioning.Versioned; +import scala.runtime.AbstractFunction1; +import scala.runtime.BoxedUnit; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Synchronous Log Reader based on {@link AsyncLogReader} + */ +class BKSyncLogReader implements LogReader, AsyncNotification { + + private final BKDistributedLogManager bkdlm; + private final BKLogReadHandler readHandler; + private final AtomicReference<IOException> readerException = + new AtomicReference<IOException>(null); + private final int maxReadAheadWaitTime; + private Promise<Void> closeFuture; + private final Optional<Long> startTransactionId; + private boolean positioned = false; + private Entry.Reader currentEntry = null; + + // readahead reader + ReadAheadEntryReader readAheadReader = null; + + // idle reader settings + private final boolean shouldCheckIdleReader; + private final int idleErrorThresholdMillis; + + // Stats + private final Counter idleReaderError; + + BKSyncLogReader(DistributedLogConfiguration conf, + BKDistributedLogManager bkdlm, + DLSN startDLSN, + Optional<Long> startTransactionId, + StatsLogger statsLogger) throws IOException { + this.bkdlm = bkdlm; + this.readHandler = bkdlm.createReadHandler( + Optional.<String>absent(), + this, + true); + this.maxReadAheadWaitTime = conf.getReadAheadWaitTime(); + this.idleErrorThresholdMillis = conf.getReaderIdleErrorThresholdMillis(); + this.shouldCheckIdleReader = idleErrorThresholdMillis > 0 && idleErrorThresholdMillis < Integer.MAX_VALUE; + this.startTransactionId = startTransactionId; + + // start readahead + startReadAhead(startDLSN); + if (!startTransactionId.isPresent()) { + positioned = true; + } + + // Stats + StatsLogger syncReaderStatsLogger = statsLogger.scope("sync_reader"); + idleReaderError = syncReaderStatsLogger.getCounter("idle_reader_error"); + } + + private void startReadAhead(DLSN startDLSN) throws IOException { + readAheadReader = new ReadAheadEntryReader( + bkdlm.getStreamName(), + startDLSN, + bkdlm.getConf(), + readHandler, + bkdlm.getReaderEntryStore(), + bkdlm.getScheduler(), + Ticker.systemTicker(), + bkdlm.alertStatsLogger); + readHandler.registerListener(readAheadReader); + readHandler.asyncStartFetchLogSegments() + .map(new AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() { + @Override + public BoxedUnit apply(Versioned<List<LogSegmentMetadata>> logSegments) { + readAheadReader.addStateChangeNotification(BKSyncLogReader.this); + readAheadReader.start(logSegments.getValue()); + return BoxedUnit.UNIT; + } + }); + } + + @VisibleForTesting + ReadAheadEntryReader getReadAheadReader() { + return readAheadReader; + } + + @VisibleForTesting + BKLogReadHandler getReadHandler() { + return readHandler; + } + + private Entry.Reader readNextEntry(boolean nonBlocking) throws IOException { + Entry.Reader entry = null; + if (nonBlocking) { + return readAheadReader.getNextReadAheadEntry(0L, TimeUnit.MILLISECONDS); + } else { + while (!readAheadReader.isReadAheadCaughtUp() + && null == readerException.get() + && null == entry) { + entry = readAheadReader.getNextReadAheadEntry(maxReadAheadWaitTime, TimeUnit.MILLISECONDS); + } + if (null != entry) { + return entry; + } + // reader is caught up + if (readAheadReader.isReadAheadCaughtUp() + && null == readerException.get()) { + entry = readAheadReader.getNextReadAheadEntry(maxReadAheadWaitTime, TimeUnit.MILLISECONDS); + } + return entry; + } + } + + private void markReaderAsIdle() throws IdleReaderException { + idleReaderError.inc(); + IdleReaderException ire = new IdleReaderException("Sync reader on stream " + + readHandler.getFullyQualifiedName() + + " is idle for more than " + idleErrorThresholdMillis + " ms"); + readerException.compareAndSet(null, ire); + throw ire; + } + + @Override + public synchronized LogRecordWithDLSN readNext(boolean nonBlocking) + throws IOException { + if (null != readerException.get()) { + throw readerException.get(); + } + LogRecordWithDLSN record = doReadNext(nonBlocking); + // no record is returned, check if the reader becomes idle + if (null == record && shouldCheckIdleReader) { + if (readAheadReader.getNumCachedEntries() <= 0 && + readAheadReader.isReaderIdle(idleErrorThresholdMillis, TimeUnit.MILLISECONDS)) { + markReaderAsIdle(); + } + } + return record; + } + + private LogRecordWithDLSN doReadNext(boolean nonBlocking) throws IOException { + LogRecordWithDLSN record = null; + + do { + // fetch one record until we don't find any entry available in the readahead cache + while (null == record) { + if (null == currentEntry) { + currentEntry = readNextEntry(nonBlocking); + if (null == currentEntry) { + return null; + } + } + record = currentEntry.nextRecord(); + if (null == record) { + currentEntry = null; + } + } + + // check if we reached the end of stream + if (record.isEndOfStream()) { + EndOfStreamException eos = new EndOfStreamException("End of Stream Reached for " + + readHandler.getFullyQualifiedName()); + readerException.compareAndSet(null, eos); + throw eos; + } + // skip control records + if (record.isControl()) { + record = null; + continue; + } + if (!positioned) { + if (record.getTransactionId() < startTransactionId.get()) { + record = null; + continue; + } else { + positioned = true; + break; + } + } else { + break; + } + } while (true); + return record; + } + + @Override + public synchronized List<LogRecordWithDLSN> readBulk(boolean nonBlocking, int numLogRecords) + throws IOException { + LinkedList<LogRecordWithDLSN> retList = + new LinkedList<LogRecordWithDLSN>(); + + int numRead = 0; + LogRecordWithDLSN record = readNext(nonBlocking); + while ((null != record)) { + retList.add(record); + numRead++; + if (numRead >= numLogRecords) { + break; + } + record = readNext(nonBlocking); + } + return retList; + } + + @Override + public Future<Void> asyncClose() { + Promise<Void> closePromise; + synchronized (this) { + if (null != closeFuture) { + return closeFuture; + } + closeFuture = closePromise = new Promise<Void>(); + } + readHandler.unregisterListener(readAheadReader); + readAheadReader.removeStateChangeNotification(this); + Utils.closeSequence(bkdlm.getScheduler(), true, + readAheadReader, + readHandler + ).proxyTo(closePromise); + return closePromise; + } + + @Override + public void close() throws IOException { + FutureUtils.result(asyncClose()); + } + + // + // Notification From ReadHandler + // + + @Override + public void notifyOnError(Throwable cause) { + if (cause instanceof IOException) { + readerException.compareAndSet(null, (IOException) cause); + } else { + readerException.compareAndSet(null, new IOException(cause)); + } + } + + @Override + public void notifyOnOperationComplete() { + // no-op + } +}