http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java new file mode 100644 index 0000000..eedfbd6 --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java @@ -0,0 +1,751 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Stopwatch; +import com.google.common.base.Ticker; +import org.apache.distributedlog.exceptions.DLIllegalStateException; +import org.apache.distributedlog.exceptions.DLInterruptedException; +import org.apache.distributedlog.exceptions.EndOfStreamException; +import org.apache.distributedlog.exceptions.IdleReaderException; +import org.apache.distributedlog.exceptions.LogNotFoundException; +import org.apache.distributedlog.exceptions.ReadCancelledException; +import org.apache.distributedlog.exceptions.UnexpectedException; +import org.apache.distributedlog.util.OrderedScheduler; +import org.apache.distributedlog.util.Utils; +import com.twitter.util.Future; +import com.twitter.util.FutureEventListener; +import com.twitter.util.Promise; +import com.twitter.util.Throw; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.versioning.Versioned; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Function1; +import scala.runtime.AbstractFunction1; +import scala.runtime.BoxedUnit; + +/** + * BookKeeper based {@link AsyncLogReader} implementation. + * + * <h3>Metrics</h3> + * All the metrics are exposed under `async_reader`. + * <ul> + * <li> `async_reader`/future_set: opstats. time spent on satisfying futures of read requests. + * if it is high, it means that the caller takes time on processing the result of read requests. + * The side effect is blocking consequent reads. + * <li> `async_reader`/schedule: opstats. time spent on scheduling next reads. + * <li> `async_reader`/background_read: opstats. time spent on background reads. + * <li> `async_reader`/read_next_exec: opstats. time spent on executing {@link #readNext()}. + * <li> `async_reader`/time_between_read_next: opstats. time spent on between two consequent {@link #readNext()}. + * if it is high, it means that the caller is slowing down on calling {@link #readNext()}. + * <li> `async_reader`/delay_until_promise_satisfied: opstats. total latency for the read requests. + * <li> `async_reader`/idle_reader_error: counter. the number idle reader errors. + * </ul> + */ +class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification { + static final Logger LOG = LoggerFactory.getLogger(BKAsyncLogReader.class); + + private static final Function1<List<LogRecordWithDLSN>, LogRecordWithDLSN> READ_NEXT_MAP_FUNCTION = + new AbstractFunction1<List<LogRecordWithDLSN>, LogRecordWithDLSN>() { + @Override + public LogRecordWithDLSN apply(List<LogRecordWithDLSN> records) { + return records.get(0); + } + }; + + private final String streamName; + protected final BKDistributedLogManager bkDistributedLogManager; + protected final BKLogReadHandler readHandler; + private final AtomicReference<Throwable> lastException = new AtomicReference<Throwable>(); + private final OrderedScheduler scheduler; + private final ConcurrentLinkedQueue<PendingReadRequest> pendingRequests = new ConcurrentLinkedQueue<PendingReadRequest>(); + private final Object scheduleLock = new Object(); + private final AtomicLong scheduleCount = new AtomicLong(0); + final private Stopwatch scheduleDelayStopwatch; + final private Stopwatch readNextDelayStopwatch; + private DLSN startDLSN; + private ReadAheadEntryReader readAheadReader = null; + private int lastPosition = 0; + private final boolean positionGapDetectionEnabled; + private final int idleErrorThresholdMillis; + final ScheduledFuture<?> idleReaderTimeoutTask; + private ScheduledFuture<?> backgroundScheduleTask = null; + // last process time + private final Stopwatch lastProcessTime; + + protected Promise<Void> closeFuture = null; + + private boolean lockStream = false; + + private final boolean returnEndOfStreamRecord; + + private final Runnable BACKGROUND_READ_SCHEDULER = new Runnable() { + @Override + public void run() { + synchronized (scheduleLock) { + backgroundScheduleTask = null; + } + scheduleBackgroundRead(); + } + }; + + // State + private Entry.Reader currentEntry = null; + private LogRecordWithDLSN nextRecord = null; + + // Failure Injector + private boolean disableProcessingReadRequests = false; + + // Stats + private final OpStatsLogger readNextExecTime; + private final OpStatsLogger delayUntilPromiseSatisfied; + private final OpStatsLogger timeBetweenReadNexts; + private final OpStatsLogger futureSetLatency; + private final OpStatsLogger scheduleLatency; + private final OpStatsLogger backgroundReaderRunTime; + private final Counter idleReaderCheckCount; + private final Counter idleReaderCheckIdleReadRequestCount; + private final Counter idleReaderCheckIdleReadAheadCount; + private final Counter idleReaderError; + + private class PendingReadRequest { + private final Stopwatch enqueueTime; + private final int numEntries; + private final List<LogRecordWithDLSN> records; + private final Promise<List<LogRecordWithDLSN>> promise; + private final long deadlineTime; + private final TimeUnit deadlineTimeUnit; + + PendingReadRequest(int numEntries, + long deadlineTime, + TimeUnit deadlineTimeUnit) { + this.numEntries = numEntries; + this.enqueueTime = Stopwatch.createStarted(); + // optimize the space usage for single read. + if (numEntries == 1) { + this.records = new ArrayList<LogRecordWithDLSN>(1); + } else { + this.records = new ArrayList<LogRecordWithDLSN>(); + } + this.promise = new Promise<List<LogRecordWithDLSN>>(); + this.deadlineTime = deadlineTime; + this.deadlineTimeUnit = deadlineTimeUnit; + } + + Promise<List<LogRecordWithDLSN>> getPromise() { + return promise; + } + + long elapsedSinceEnqueue(TimeUnit timeUnit) { + return enqueueTime.elapsed(timeUnit); + } + + void setException(Throwable throwable) { + Stopwatch stopwatch = Stopwatch.createStarted(); + if (promise.updateIfEmpty(new Throw<List<LogRecordWithDLSN>>(throwable))) { + futureSetLatency.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); + delayUntilPromiseSatisfied.registerFailedEvent(enqueueTime.elapsed(TimeUnit.MICROSECONDS)); + } + } + + boolean hasReadRecords() { + return records.size() > 0; + } + + boolean hasReadEnoughRecords() { + return records.size() >= numEntries; + } + + long getRemainingWaitTime() { + if (deadlineTime <= 0L) { + return 0L; + } + return deadlineTime - elapsedSinceEnqueue(deadlineTimeUnit); + } + + void addRecord(LogRecordWithDLSN record) { + records.add(record); + } + + void complete() { + if (LOG.isTraceEnabled()) { + LOG.trace("{} : Satisfied promise with {} records", readHandler.getFullyQualifiedName(), records.size()); + } + delayUntilPromiseSatisfied.registerSuccessfulEvent(enqueueTime.stop().elapsed(TimeUnit.MICROSECONDS)); + Stopwatch stopwatch = Stopwatch.createStarted(); + promise.setValue(records); + futureSetLatency.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); + } + } + + BKAsyncLogReader(BKDistributedLogManager bkdlm, + OrderedScheduler scheduler, + DLSN startDLSN, + Optional<String> subscriberId, + boolean returnEndOfStreamRecord, + StatsLogger statsLogger) { + this.streamName = bkdlm.getStreamName(); + this.bkDistributedLogManager = bkdlm; + this.scheduler = scheduler; + this.readHandler = bkDistributedLogManager.createReadHandler(subscriberId, + this, true); + LOG.debug("Starting async reader at {}", startDLSN); + this.startDLSN = startDLSN; + this.scheduleDelayStopwatch = Stopwatch.createUnstarted(); + this.readNextDelayStopwatch = Stopwatch.createStarted(); + this.positionGapDetectionEnabled = bkdlm.getConf().getPositionGapDetectionEnabled(); + this.idleErrorThresholdMillis = bkdlm.getConf().getReaderIdleErrorThresholdMillis(); + this.returnEndOfStreamRecord = returnEndOfStreamRecord; + + // Stats + StatsLogger asyncReaderStatsLogger = statsLogger.scope("async_reader"); + futureSetLatency = asyncReaderStatsLogger.getOpStatsLogger("future_set"); + scheduleLatency = asyncReaderStatsLogger.getOpStatsLogger("schedule"); + backgroundReaderRunTime = asyncReaderStatsLogger.getOpStatsLogger("background_read"); + readNextExecTime = asyncReaderStatsLogger.getOpStatsLogger("read_next_exec"); + timeBetweenReadNexts = asyncReaderStatsLogger.getOpStatsLogger("time_between_read_next"); + delayUntilPromiseSatisfied = asyncReaderStatsLogger.getOpStatsLogger("delay_until_promise_satisfied"); + idleReaderError = asyncReaderStatsLogger.getCounter("idle_reader_error"); + idleReaderCheckCount = asyncReaderStatsLogger.getCounter("idle_reader_check_total"); + idleReaderCheckIdleReadRequestCount = asyncReaderStatsLogger.getCounter("idle_reader_check_idle_read_requests"); + idleReaderCheckIdleReadAheadCount = asyncReaderStatsLogger.getCounter("idle_reader_check_idle_readahead"); + + // Lock the stream if requested. The lock will be released when the reader is closed. + this.lockStream = false; + this.idleReaderTimeoutTask = scheduleIdleReaderTaskIfNecessary(); + this.lastProcessTime = Stopwatch.createStarted(); + } + + private ScheduledFuture<?> scheduleIdleReaderTaskIfNecessary() { + if (idleErrorThresholdMillis < Integer.MAX_VALUE) { + // Dont run the task more than once every seconds (for sanity) + long period = Math.max(idleErrorThresholdMillis / 10, 1000); + // Except when idle reader threshold is less than a second (tests?) + period = Math.min(period, idleErrorThresholdMillis / 5); + + return scheduler.scheduleAtFixedRate(streamName, new Runnable() { + @Override + public void run() { + PendingReadRequest nextRequest = pendingRequests.peek(); + + idleReaderCheckCount.inc(); + if (null == nextRequest) { + return; + } + + idleReaderCheckIdleReadRequestCount.inc(); + if (nextRequest.elapsedSinceEnqueue(TimeUnit.MILLISECONDS) < idleErrorThresholdMillis) { + return; + } + + ReadAheadEntryReader readAheadReader = getReadAheadReader(); + + // read request has been idle + // - cache has records but read request are idle, + // that means notification was missed between readahead and reader. + // - cache is empty and readahead is idle (no records added for a long time) + idleReaderCheckIdleReadAheadCount.inc(); + try { + if (null == readAheadReader || (!hasMoreRecords() && + readAheadReader.isReaderIdle(idleErrorThresholdMillis, TimeUnit.MILLISECONDS))) { + markReaderAsIdle(); + return; + } else if (lastProcessTime.elapsed(TimeUnit.MILLISECONDS) > idleErrorThresholdMillis) { + markReaderAsIdle();; + } + } catch (IOException e) { + setLastException(e); + return; + } + } + }, period, period, TimeUnit.MILLISECONDS); + } + return null; + } + + synchronized ReadAheadEntryReader getReadAheadReader() { + return readAheadReader; + } + + void cancelIdleReaderTask() { + // Do this after we have checked that the reader was not previously closed + try { + if (null != idleReaderTimeoutTask) { + idleReaderTimeoutTask.cancel(true); + } + } catch (Exception exc) { + LOG.info("{}: Failed to cancel the background idle reader timeout task", readHandler.getFullyQualifiedName()); + } + } + + private void markReaderAsIdle() { + idleReaderError.inc(); + IdleReaderException ire = new IdleReaderException("Reader on stream " + + readHandler.getFullyQualifiedName() + + " is idle for " + idleErrorThresholdMillis +" ms"); + setLastException(ire); + // cancel all pending reads directly rather than notifying on error + // because idle reader could happen on idle read requests that usually means something wrong + // in scheduling reads + cancelAllPendingReads(ire); + } + + protected synchronized void setStartDLSN(DLSN fromDLSN) throws UnexpectedException { + if (null != readAheadReader) { + throw new UnexpectedException("Could't reset from dlsn after reader already starts reading."); + } + startDLSN = fromDLSN; + } + + @VisibleForTesting + public synchronized DLSN getStartDLSN() { + return startDLSN; + } + + public Future<Void> lockStream() { + this.lockStream = true; + return readHandler.lockStream(); + } + + private boolean checkClosedOrInError(String operation) { + if (null == lastException.get()) { + try { + if (null != readHandler && null != getReadAheadReader()) { + getReadAheadReader().checkLastException(); + } + + bkDistributedLogManager.checkClosedOrInError(operation); + } catch (IOException exc) { + setLastException(exc); + } + } + + if (lockStream) { + try { + readHandler.checkReadLock(); + } catch (IOException ex) { + setLastException(ex); + } + } + + if (null != lastException.get()) { + LOG.trace("Cancelling pending reads"); + cancelAllPendingReads(lastException.get()); + return true; + } + + return false; + } + + private void setLastException(IOException exc) { + lastException.compareAndSet(null, exc); + } + + @Override + public String getStreamName() { + return streamName; + } + + /** + * @return A promise that when satisfied will contain the Log Record with its DLSN. + */ + @Override + public synchronized Future<LogRecordWithDLSN> readNext() { + return readInternal(1, 0, TimeUnit.MILLISECONDS).map(READ_NEXT_MAP_FUNCTION); + } + + public synchronized Future<List<LogRecordWithDLSN>> readBulk(int numEntries) { + return readInternal(numEntries, 0, TimeUnit.MILLISECONDS); + } + + @Override + public synchronized Future<List<LogRecordWithDLSN>> readBulk(int numEntries, + long waitTime, + TimeUnit timeUnit) { + return readInternal(numEntries, waitTime, timeUnit); + } + + /** + * Read up to <i>numEntries</i> entries. The future will be satisfied when any number of entries are + * ready (1 to <i>numEntries</i>). + * + * @param numEntries + * num entries to read + * @return A promise that satisfied with a non-empty list of log records with their DLSN. + */ + private synchronized Future<List<LogRecordWithDLSN>> readInternal(int numEntries, + long deadlineTime, + TimeUnit deadlineTimeUnit) { + timeBetweenReadNexts.registerSuccessfulEvent(readNextDelayStopwatch.elapsed(TimeUnit.MICROSECONDS)); + readNextDelayStopwatch.reset().start(); + final PendingReadRequest readRequest = new PendingReadRequest(numEntries, deadlineTime, deadlineTimeUnit); + + if (null == readAheadReader) { + final ReadAheadEntryReader readAheadEntryReader = this.readAheadReader = new ReadAheadEntryReader( + getStreamName(), + getStartDLSN(), + bkDistributedLogManager.getConf(), + readHandler, + bkDistributedLogManager.getReaderEntryStore(), + bkDistributedLogManager.getScheduler(), + Ticker.systemTicker(), + bkDistributedLogManager.alertStatsLogger); + readHandler.checkLogStreamExists().addEventListener(new FutureEventListener<Void>() { + @Override + public void onSuccess(Void value) { + try { + readHandler.registerListener(readAheadEntryReader); + readHandler.asyncStartFetchLogSegments() + .map(new AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() { + @Override + public BoxedUnit apply(Versioned<List<LogSegmentMetadata>> logSegments) { + readAheadEntryReader.addStateChangeNotification(BKAsyncLogReader.this); + readAheadEntryReader.start(logSegments.getValue()); + return BoxedUnit.UNIT; + } + }); + } catch (Exception exc) { + notifyOnError(exc); + } + } + + @Override + public void onFailure(Throwable cause) { + notifyOnError(cause); + } + }); + } + + if (checkClosedOrInError("readNext")) { + readRequest.setException(lastException.get()); + } else { + boolean queueEmpty = pendingRequests.isEmpty(); + pendingRequests.add(readRequest); + + if (queueEmpty) { + scheduleBackgroundRead(); + } + } + + readNextExecTime.registerSuccessfulEvent(readNextDelayStopwatch.elapsed(TimeUnit.MICROSECONDS)); + readNextDelayStopwatch.reset().start(); + + return readRequest.getPromise(); + } + + public synchronized void scheduleBackgroundRead() { + // if the reader is already closed, we don't need to schedule background read again. + if (null != closeFuture) { + return; + } + + long prevCount = scheduleCount.getAndIncrement(); + if (0 == prevCount) { + scheduleDelayStopwatch.reset().start(); + scheduler.submit(streamName, this); + } + } + + @Override + public Future<Void> asyncClose() { + // Cancel the idle reader timeout task, interrupting if necessary + ReadCancelledException exception; + Promise<Void> closePromise; + synchronized (this) { + if (null != closeFuture) { + return closeFuture; + } + closePromise = closeFuture = new Promise<Void>(); + exception = new ReadCancelledException(readHandler.getFullyQualifiedName(), "Reader was closed"); + setLastException(exception); + } + + // Do this after we have checked that the reader was not previously closed + cancelIdleReaderTask(); + + synchronized (scheduleLock) { + if (null != backgroundScheduleTask) { + backgroundScheduleTask.cancel(true); + } + } + + cancelAllPendingReads(exception); + + ReadAheadEntryReader readAheadReader = getReadAheadReader(); + if (null != readAheadReader) { + readHandler.unregisterListener(readAheadReader); + readAheadReader.removeStateChangeNotification(this); + } + Utils.closeSequence(bkDistributedLogManager.getScheduler(), true, + readAheadReader, + readHandler + ).proxyTo(closePromise); + return closePromise; + } + + private void cancelAllPendingReads(Throwable throwExc) { + for (PendingReadRequest promise : pendingRequests) { + promise.setException(throwExc); + } + pendingRequests.clear(); + } + + synchronized boolean hasMoreRecords() throws IOException { + if (null == readAheadReader) { + return false; + } + if (readAheadReader.getNumCachedEntries() > 0 || null != nextRecord) { + return true; + } else if (null != currentEntry) { + nextRecord = currentEntry.nextRecord(); + return null != nextRecord; + } + return false; + } + + private synchronized LogRecordWithDLSN readNextRecord() throws IOException { + if (null == readAheadReader) { + return null; + } + if (null == currentEntry) { + currentEntry = readAheadReader.getNextReadAheadEntry(0L, TimeUnit.MILLISECONDS); + // no entry after reading from read ahead then return null + if (null == currentEntry) { + return null; + } + } + + LogRecordWithDLSN recordToReturn; + if (null == nextRecord) { + nextRecord = currentEntry.nextRecord(); + // no more records in current entry + if (null == nextRecord) { + currentEntry = null; + return readNextRecord(); + } + } + + // found a record to return and prefetch the next one + recordToReturn = nextRecord; + nextRecord = currentEntry.nextRecord(); + return recordToReturn; + } + + @Override + public void run() { + synchronized(scheduleLock) { + if (scheduleDelayStopwatch.isRunning()) { + scheduleLatency.registerSuccessfulEvent(scheduleDelayStopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); + } + + Stopwatch runTime = Stopwatch.createStarted(); + int iterations = 0; + long scheduleCountLocal = scheduleCount.get(); + LOG.debug("{}: Scheduled Background Reader", readHandler.getFullyQualifiedName()); + while(true) { + if (LOG.isTraceEnabled()) { + LOG.trace("{}: Executing Iteration: {}", readHandler.getFullyQualifiedName(), iterations++); + } + + PendingReadRequest nextRequest = null; + synchronized(this) { + nextRequest = pendingRequests.peek(); + + // Queue is empty, nothing to read, return + if (null == nextRequest) { + LOG.trace("{}: Queue Empty waiting for Input", readHandler.getFullyQualifiedName()); + scheduleCount.set(0); + backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS)); + return; + } + + if (disableProcessingReadRequests) { + LOG.info("Reader of {} is forced to stop processing read requests", readHandler.getFullyQualifiedName()); + return; + } + } + lastProcessTime.reset().start(); + + // If the oldest pending promise is interrupted then we must mark + // the reader in error and abort all pending reads since we dont + // know the last consumed read + if (null == lastException.get()) { + if (nextRequest.getPromise().isInterrupted().isDefined()) { + setLastException(new DLInterruptedException("Interrupted on reading " + readHandler.getFullyQualifiedName() + " : ", + nextRequest.getPromise().isInterrupted().get())); + } + } + + if (checkClosedOrInError("readNext")) { + if (!(lastException.get().getCause() instanceof LogNotFoundException)) { + LOG.warn("{}: Exception", readHandler.getFullyQualifiedName(), lastException.get()); + } + backgroundReaderRunTime.registerFailedEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS)); + return; + } + + try { + // Fail 10% of the requests when asked to simulate errors + if (bkDistributedLogManager.getFailureInjector().shouldInjectErrors()) { + throw new IOException("Reader Simulated Exception"); + } + LogRecordWithDLSN record; + while (!nextRequest.hasReadEnoughRecords()) { + // read single record + do { + record = readNextRecord(); + } while (null != record && (record.isControl() || (record.getDlsn().compareTo(getStartDLSN()) < 0))); + if (null == record) { + break; + } else { + if (record.isEndOfStream() && !returnEndOfStreamRecord) { + setLastException(new EndOfStreamException("End of Stream Reached for " + + readHandler.getFullyQualifiedName())); + break; + } + + // gap detection + if (recordPositionsContainsGap(record, lastPosition)) { + bkDistributedLogManager.raiseAlert("Gap detected between records at record = {}", record); + if (positionGapDetectionEnabled) { + throw new DLIllegalStateException("Gap detected between records at record = " + record); + } + } + lastPosition = record.getLastPositionWithinLogSegment(); + + nextRequest.addRecord(record); + } + }; + } catch (IOException exc) { + setLastException(exc); + if (!(exc instanceof LogNotFoundException)) { + LOG.warn("{} : read with skip Exception", readHandler.getFullyQualifiedName(), lastException.get()); + } + continue; + } + + if (nextRequest.hasReadRecords()) { + long remainingWaitTime = nextRequest.getRemainingWaitTime(); + if (remainingWaitTime > 0 && !nextRequest.hasReadEnoughRecords()) { + backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS)); + scheduleDelayStopwatch.reset().start(); + scheduleCount.set(0); + // the request could still wait for more records + backgroundScheduleTask = scheduler.schedule( + streamName, + BACKGROUND_READ_SCHEDULER, + remainingWaitTime, + nextRequest.deadlineTimeUnit); + return; + } + + PendingReadRequest request = pendingRequests.poll(); + if (null != request && nextRequest == request) { + request.complete(); + if (null != backgroundScheduleTask) { + backgroundScheduleTask.cancel(true); + backgroundScheduleTask = null; + } + } else { + DLIllegalStateException ise = new DLIllegalStateException("Unexpected condition at dlsn = " + + nextRequest.records.get(0).getDlsn()); + nextRequest.setException(ise); + if (null != request) { + request.setException(ise); + } + // We should never get here as we should have exited the loop if + // pendingRequests were empty + bkDistributedLogManager.raiseAlert("Unexpected condition at dlsn = {}", + nextRequest.records.get(0).getDlsn()); + setLastException(ise); + } + } else { + if (0 == scheduleCountLocal) { + LOG.trace("Schedule count dropping to zero", lastException.get()); + backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS)); + return; + } + scheduleCountLocal = scheduleCount.decrementAndGet(); + } + } + } + } + + private boolean recordPositionsContainsGap(LogRecordWithDLSN record, long lastPosition) { + final boolean firstLogRecord = (1 == record.getPositionWithinLogSegment()); + final boolean endOfStreamRecord = record.isEndOfStream(); + final boolean emptyLogSegment = (0 == lastPosition); + final boolean positionIncreasedByOne = (record.getPositionWithinLogSegment() == (lastPosition + 1)); + + return !firstLogRecord && !endOfStreamRecord && !emptyLogSegment && + !positionIncreasedByOne; + } + + /** + * Triggered when the background activity encounters an exception + */ + @Override + public void notifyOnError(Throwable cause) { + if (cause instanceof IOException) { + setLastException((IOException) cause); + } else { + setLastException(new IOException(cause)); + } + scheduleBackgroundRead(); + } + + /** + * Triggered when the background activity completes an operation + */ + @Override + public void notifyOnOperationComplete() { + scheduleBackgroundRead(); + } + + @VisibleForTesting + void simulateErrors() { + bkDistributedLogManager.getFailureInjector().injectErrors(true); + } + + @VisibleForTesting + synchronized void disableReadAheadLogSegmentsNotification() { + readHandler.disableReadAheadLogSegmentsNotification(); + } + + @VisibleForTesting + synchronized void disableProcessingReadRequests() { + disableProcessingReadRequests = true; + } +} +
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java new file mode 100644 index 0000000..1102ff5 --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java @@ -0,0 +1,559 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import com.google.common.base.Stopwatch; +import com.google.common.annotations.VisibleForTesting; +import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; +import org.apache.distributedlog.exceptions.StreamNotReadyException; +import org.apache.distributedlog.exceptions.WriteCancelledException; +import org.apache.distributedlog.exceptions.WriteException; +import org.apache.distributedlog.feature.CoreFeatureKeys; +import org.apache.distributedlog.util.FailpointUtils; +import org.apache.distributedlog.util.FutureUtils; +import com.twitter.util.Future; +import com.twitter.util.FutureEventListener; +import com.twitter.util.Promise; +import com.twitter.util.Try; +import org.apache.bookkeeper.feature.Feature; +import org.apache.bookkeeper.feature.FeatureProvider; +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Function1; +import scala.Option; +import scala.runtime.AbstractFunction1; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +/** + * BookKeeper based {@link AsyncLogWriter} implementation. + * + * <h3>Metrics</h3> + * All the metrics are exposed under `log_writer`. + * <ul> + * <li> `log_writer/write`: opstats. latency characteristics about the time that write operations spent. + * <li> `log_writer/bulk_write`: opstats. latency characteristics about the time that bulk_write + * operations spent. + * are pending in the queue for long time due to log segment rolling. + * <li> `log_writer/get_writer`: opstats. the time spent on getting the writer. it could spike when there + * is log segment rolling happened during getting the writer. it is a good stat to look into when the latency + * is caused by queuing time. + * <li> `log_writer/pending_request_dispatch`: counter. the number of queued operations that are dispatched + * after log segment is rolled. it is an metric on measuring how many operations has been queued because of + * log segment rolling. + * </ul> + * See {@link BKLogSegmentWriter} for segment writer stats. + */ +public class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWriter { + + static final Logger LOG = LoggerFactory.getLogger(BKAsyncLogWriter.class); + + static Function1<List<LogSegmentMetadata>, Boolean> TruncationResultConverter = + new AbstractFunction1<List<LogSegmentMetadata>, Boolean>() { + @Override + public Boolean apply(List<LogSegmentMetadata> segments) { + return true; + } + }; + + // Records pending for roll log segment. + class PendingLogRecord implements FutureEventListener<DLSN> { + + final LogRecord record; + final Promise<DLSN> promise; + final boolean flush; + + PendingLogRecord(LogRecord record, boolean flush) { + this.record = record; + this.promise = new Promise<DLSN>(); + this.flush = flush; + } + + @Override + public void onSuccess(DLSN value) { + promise.setValue(value); + } + + @Override + public void onFailure(Throwable cause) { + promise.setException(cause); + encounteredError = true; + } + } + + /** + * Last pending record in current log segment. After it is satisified, it would + * roll log segment. + * + * This implementation is based on the assumption that all future satisified in same + * order future pool. + */ + class LastPendingLogRecord extends PendingLogRecord { + + LastPendingLogRecord(LogRecord record, boolean flush) { + super(record, flush); + } + + @Override + public void onSuccess(DLSN value) { + super.onSuccess(value); + // roll log segment and issue all pending requests. + rollLogSegmentAndIssuePendingRequests(record.getTransactionId()); + } + + @Override + public void onFailure(Throwable cause) { + super.onFailure(cause); + // error out pending requests. + errorOutPendingRequestsAndWriter(cause); + } + } + + private final boolean streamFailFast; + private final boolean disableRollOnSegmentError; + private LinkedList<PendingLogRecord> pendingRequests = null; + private volatile boolean encounteredError = false; + private Promise<BKLogSegmentWriter> rollingFuture = null; + private long lastTxId = DistributedLogConstants.INVALID_TXID; + + private final StatsLogger statsLogger; + private final OpStatsLogger writeOpStatsLogger; + private final OpStatsLogger markEndOfStreamOpStatsLogger; + private final OpStatsLogger bulkWriteOpStatsLogger; + private final OpStatsLogger getWriterOpStatsLogger; + private final Counter pendingRequestDispatch; + + private final Feature disableLogSegmentRollingFeature; + + BKAsyncLogWriter(DistributedLogConfiguration conf, + DynamicDistributedLogConfiguration dynConf, + BKDistributedLogManager bkdlm, + BKLogWriteHandler writeHandler, /** log writer owns the handler **/ + FeatureProvider featureProvider, + StatsLogger dlmStatsLogger) { + super(conf, dynConf, bkdlm); + this.writeHandler = writeHandler; + this.streamFailFast = conf.getFailFastOnStreamNotReady(); + this.disableRollOnSegmentError = conf.getDisableRollingOnLogSegmentError(); + + // features + disableLogSegmentRollingFeature = featureProvider.getFeature(CoreFeatureKeys.DISABLE_LOGSEGMENT_ROLLING.name().toLowerCase()); + // stats + this.statsLogger = dlmStatsLogger.scope("log_writer"); + this.writeOpStatsLogger = statsLogger.getOpStatsLogger("write"); + this.markEndOfStreamOpStatsLogger = statsLogger.getOpStatsLogger("mark_end_of_stream"); + this.bulkWriteOpStatsLogger = statsLogger.getOpStatsLogger("bulk_write"); + this.getWriterOpStatsLogger = statsLogger.getOpStatsLogger("get_writer"); + this.pendingRequestDispatch = statsLogger.getCounter("pending_request_dispatch"); + } + + @VisibleForTesting + synchronized void setLastTxId(long txId) { + lastTxId = Math.max(lastTxId, txId); + } + + @Override + public synchronized long getLastTxId() { + return lastTxId; + } + + /** + * Write a log record as control record. The method will be used by Monitor Service to enforce a new inprogress segment. + * + * @param record + * log record + * @return future of the write + */ + public Future<DLSN> writeControlRecord(final LogRecord record) { + record.setControl(); + return write(record); + } + + private BKLogSegmentWriter getCachedLogSegmentWriter() throws WriteException { + if (encounteredError) { + throw new WriteException(bkDistributedLogManager.getStreamName(), + "writer has been closed due to error."); + } + BKLogSegmentWriter segmentWriter = getCachedLogWriter(); + if (null != segmentWriter + && segmentWriter.isLogSegmentInError() + && !disableRollOnSegmentError) { + return null; + } else { + return segmentWriter; + } + } + + private Future<BKLogSegmentWriter> getLogSegmentWriter(long firstTxid, + boolean bestEffort, + boolean rollLog, + boolean allowMaxTxID) { + Stopwatch stopwatch = Stopwatch.createStarted(); + return FutureUtils.stats( + doGetLogSegmentWriter(firstTxid, bestEffort, rollLog, allowMaxTxID), + getWriterOpStatsLogger, + stopwatch); + } + + private Future<BKLogSegmentWriter> doGetLogSegmentWriter(final long firstTxid, + final boolean bestEffort, + final boolean rollLog, + final boolean allowMaxTxID) { + if (encounteredError) { + return Future.exception(new WriteException(bkDistributedLogManager.getStreamName(), + "writer has been closed due to error.")); + } + Future<BKLogSegmentWriter> writerFuture = asyncGetLedgerWriter(!disableRollOnSegmentError); + if (null == writerFuture) { + return rollLogSegmentIfNecessary(null, firstTxid, bestEffort, allowMaxTxID); + } else if (rollLog) { + return writerFuture.flatMap(new AbstractFunction1<BKLogSegmentWriter, Future<BKLogSegmentWriter>>() { + @Override + public Future<BKLogSegmentWriter> apply(BKLogSegmentWriter writer) { + return rollLogSegmentIfNecessary(writer, firstTxid, bestEffort, allowMaxTxID); + } + }); + } else { + return writerFuture; + } + } + + /** + * We write end of stream marker by writing a record with MAX_TXID, so we need to allow using + * max txid when rolling for this case only. + */ + private Future<BKLogSegmentWriter> getLogSegmentWriterForEndOfStream() { + return getLogSegmentWriter(DistributedLogConstants.MAX_TXID, + false /* bestEffort */, + false /* roll log */, + true /* allow max txid */); + } + + private Future<BKLogSegmentWriter> getLogSegmentWriter(long firstTxid, + boolean bestEffort, + boolean rollLog) { + return getLogSegmentWriter(firstTxid, bestEffort, rollLog, false /* allow max txid */); + } + + Future<DLSN> queueRequest(LogRecord record, boolean flush) { + PendingLogRecord pendingLogRecord = new PendingLogRecord(record, flush); + pendingRequests.add(pendingLogRecord); + return pendingLogRecord.promise; + } + + boolean shouldRollLog(BKLogSegmentWriter w) { + try { + return null == w || + (!disableLogSegmentRollingFeature.isAvailable() && + shouldStartNewSegment(w)); + } catch (IOException ioe) { + return false; + } + } + + void startQueueingRequests() { + assert(null == pendingRequests && null == rollingFuture); + pendingRequests = new LinkedList<PendingLogRecord>(); + rollingFuture = new Promise<BKLogSegmentWriter>(); + } + + // for ordering guarantee, we shouldn't send requests to next log segments until + // previous log segment is done. + private synchronized Future<DLSN> asyncWrite(final LogRecord record, + boolean flush) { + // The passed in writer may be stale since we acquire the writer outside of sync + // lock. If we recently rolled and the new writer is cached, use that instead. + Future<DLSN> result = null; + BKLogSegmentWriter w; + try { + w = getCachedLogSegmentWriter(); + } catch (WriteException we) { + return Future.exception(we); + } + if (null != rollingFuture) { + if (streamFailFast) { + result = Future.exception(new StreamNotReadyException("Rolling log segment")); + } else { + result = queueRequest(record, flush); + } + } else if (shouldRollLog(w)) { + // insert a last record, so when it called back, we will trigger a log segment rolling + startQueueingRequests(); + if (null != w) { + LastPendingLogRecord lastLogRecordInCurrentSegment = new LastPendingLogRecord(record, flush); + w.asyncWrite(record, true).addEventListener(lastLogRecordInCurrentSegment); + result = lastLogRecordInCurrentSegment.promise; + } else { // no log segment yet. roll the log segment and issue pending requests. + result = queueRequest(record, flush); + rollLogSegmentAndIssuePendingRequests(record.getTransactionId()); + } + } else { + result = w.asyncWrite(record, flush); + } + // use map here rather than onSuccess because we want lastTxId to be updated before + // satisfying the future + return result.map(new AbstractFunction1<DLSN, DLSN>() { + @Override + public DLSN apply(DLSN dlsn) { + setLastTxId(record.getTransactionId()); + return dlsn; + } + }); + } + + private List<Future<DLSN>> asyncWriteBulk(List<LogRecord> records) { + final ArrayList<Future<DLSN>> results = new ArrayList<Future<DLSN>>(records.size()); + Iterator<LogRecord> iterator = records.iterator(); + while (iterator.hasNext()) { + LogRecord record = iterator.next(); + Future<DLSN> future = asyncWrite(record, !iterator.hasNext()); + results.add(future); + + // Abort early if an individual write has already failed. + Option<Try<DLSN>> result = future.poll(); + if (result.isDefined() && result.get().isThrow()) { + break; + } + } + if (records.size() > results.size()) { + appendCancelledFutures(results, records.size() - results.size()); + } + return results; + } + + private void appendCancelledFutures(List<Future<DLSN>> futures, int numToAdd) { + final WriteCancelledException cre = + new WriteCancelledException(getStreamName()); + for (int i = 0; i < numToAdd; i++) { + Future<DLSN> cancelledFuture = Future.exception(cre); + futures.add(cancelledFuture); + } + } + + private void rollLogSegmentAndIssuePendingRequests(final long firstTxId) { + getLogSegmentWriter(firstTxId, true, true) + .addEventListener(new FutureEventListener<BKLogSegmentWriter>() { + @Override + public void onSuccess(BKLogSegmentWriter writer) { + try { + synchronized (BKAsyncLogWriter.this) { + for (PendingLogRecord pendingLogRecord : pendingRequests) { + FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_LogWriterIssuePending); + writer.asyncWrite(pendingLogRecord.record, pendingLogRecord.flush) + .addEventListener(pendingLogRecord); + } + // if there are no records in the pending queue, let's write a control record + // so that when a new log segment is rolled, a control record will be added and + // the corresponding bookies would be able to create its ledger. + if (pendingRequests.isEmpty()) { + LogRecord controlRecord = new LogRecord(firstTxId, + DistributedLogConstants.CONTROL_RECORD_CONTENT); + controlRecord.setControl(); + PendingLogRecord controlReq = new PendingLogRecord(controlRecord, false); + writer.asyncWrite(controlReq.record, controlReq.flush) + .addEventListener(controlReq); + } + if (null != rollingFuture) { + FutureUtils.setValue(rollingFuture, writer); + } + rollingFuture = null; + pendingRequestDispatch.add(pendingRequests.size()); + pendingRequests = null; + } + } catch (IOException ioe) { + errorOutPendingRequestsAndWriter(ioe); + } + } + @Override + public void onFailure(Throwable cause) { + errorOutPendingRequestsAndWriter(cause); + } + }); + } + + @VisibleForTesting + void errorOutPendingRequests(Throwable cause, boolean errorOutWriter) { + final List<PendingLogRecord> pendingRequestsSnapshot; + synchronized (this) { + pendingRequestsSnapshot = pendingRequests; + encounteredError = errorOutWriter; + pendingRequests = null; + if (null != rollingFuture) { + FutureUtils.setException(rollingFuture, cause); + } + rollingFuture = null; + } + + pendingRequestDispatch.add(pendingRequestsSnapshot.size()); + + // After erroring out the writer above, no more requests + // will be enqueued to pendingRequests + for (PendingLogRecord pendingLogRecord : pendingRequestsSnapshot) { + pendingLogRecord.promise.setException(cause); + } + } + + void errorOutPendingRequestsAndWriter(Throwable cause) { + errorOutPendingRequests(cause, true /* error out writer */); + } + + /** + * Write a log record to the stream. + * + * @param record single log record + */ + @Override + public Future<DLSN> write(final LogRecord record) { + final Stopwatch stopwatch = Stopwatch.createStarted(); + return FutureUtils.stats( + asyncWrite(record, true), + writeOpStatsLogger, + stopwatch); + } + + /** + * Write many log records to the stream. The return type here is unfortunate but its a direct result + * of having to combine FuturePool and the asyncWriteBulk method which returns a future as well. The + * problem is the List that asyncWriteBulk returns can't be materialized until getLogSegmentWriter + * completes, so it has to be wrapped in a future itself. + * + * @param records list of records + */ + @Override + public Future<List<Future<DLSN>>> writeBulk(final List<LogRecord> records) { + final Stopwatch stopwatch = Stopwatch.createStarted(); + return FutureUtils.stats( + Future.value(asyncWriteBulk(records)), + bulkWriteOpStatsLogger, + stopwatch); + } + + @Override + public Future<Boolean> truncate(final DLSN dlsn) { + if (DLSN.InvalidDLSN == dlsn) { + return Future.value(false); + } + BKLogWriteHandler writeHandler; + try { + writeHandler = getWriteHandler(); + } catch (IOException e) { + return Future.exception(e); + } + return writeHandler.setLogSegmentsOlderThanDLSNTruncated(dlsn).map(TruncationResultConverter); + } + + Future<Long> flushAndCommit() { + Future<BKLogSegmentWriter> writerFuture; + synchronized (this) { + if (null != this.rollingFuture) { + writerFuture = this.rollingFuture; + } else { + writerFuture = getCachedLogWriterFuture(); + } + } + if (null == writerFuture) { + return Future.value(getLastTxId()); + } + return writerFuture.flatMap(new AbstractFunction1<BKLogSegmentWriter, Future<Long>>() { + @Override + public Future<Long> apply(BKLogSegmentWriter writer) { + return writer.flushAndCommit(); + } + }); + } + + Future<Long> markEndOfStream() { + final Stopwatch stopwatch = Stopwatch.createStarted(); + Future<BKLogSegmentWriter> logSegmentWriterFuture; + synchronized (this) { + logSegmentWriterFuture = this.rollingFuture; + } + if (null == logSegmentWriterFuture) { + logSegmentWriterFuture = getLogSegmentWriterForEndOfStream(); + } + + return FutureUtils.stats( + logSegmentWriterFuture.flatMap(new AbstractFunction1<BKLogSegmentWriter, Future<Long>>() { + @Override + public Future<Long> apply(BKLogSegmentWriter w) { + return w.markEndOfStream(); + } + }), + markEndOfStreamOpStatsLogger, + stopwatch); + } + + @Override + protected Future<Void> asyncCloseAndComplete() { + Future<BKLogSegmentWriter> logSegmentWriterFuture; + synchronized (this) { + logSegmentWriterFuture = this.rollingFuture; + } + + if (null == logSegmentWriterFuture) { + return super.asyncCloseAndComplete(); + } else { + return logSegmentWriterFuture.flatMap(new AbstractFunction1<BKLogSegmentWriter, Future<Void>>() { + @Override + public Future<Void> apply(BKLogSegmentWriter segmentWriter) { + return BKAsyncLogWriter.super.asyncCloseAndComplete(); + } + }); + } + } + + @Override + void closeAndComplete() throws IOException { + FutureUtils.result(asyncCloseAndComplete()); + } + + /** + * *TEMP HACK* + * Get the name of the stream this writer writes data to + */ + @Override + public String getStreamName() { + return bkDistributedLogManager.getStreamName(); + } + + @Override + public Future<Void> asyncAbort() { + Future<Void> result = super.asyncAbort(); + synchronized (this) { + if (pendingRequests != null) { + for (PendingLogRecord pendingLogRecord : pendingRequests) { + pendingLogRecord.promise.setException(new WriteException(bkDistributedLogManager.getStreamName(), + "abort wring: writer has been closed due to error.")); + } + } + } + return result; + } + + @Override + public String toString() { + return String.format("AsyncLogWriter:%s", getStreamName()); + } +}