http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java deleted file mode 100644 index aee4103..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java +++ /dev/null @@ -1,751 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.base.Stopwatch; -import com.google.common.base.Ticker; -import com.twitter.distributedlog.exceptions.DLIllegalStateException; -import com.twitter.distributedlog.exceptions.DLInterruptedException; -import com.twitter.distributedlog.exceptions.EndOfStreamException; -import com.twitter.distributedlog.exceptions.IdleReaderException; -import com.twitter.distributedlog.exceptions.LogNotFoundException; -import com.twitter.distributedlog.exceptions.ReadCancelledException; -import com.twitter.distributedlog.exceptions.UnexpectedException; -import com.twitter.distributedlog.util.OrderedScheduler; -import com.twitter.distributedlog.util.Utils; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; -import com.twitter.util.Throw; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.bookkeeper.versioning.Versioned; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.Function1; -import scala.runtime.AbstractFunction1; -import scala.runtime.BoxedUnit; - -/** - * BookKeeper based {@link AsyncLogReader} implementation. - * - * <h3>Metrics</h3> - * All the metrics are exposed under `async_reader`. - * <ul> - * <li> `async_reader`/future_set: opstats. time spent on satisfying futures of read requests. - * if it is high, it means that the caller takes time on processing the result of read requests. - * The side effect is blocking consequent reads. - * <li> `async_reader`/schedule: opstats. time spent on scheduling next reads. - * <li> `async_reader`/background_read: opstats. time spent on background reads. - * <li> `async_reader`/read_next_exec: opstats. time spent on executing {@link #readNext()}. - * <li> `async_reader`/time_between_read_next: opstats. time spent on between two consequent {@link #readNext()}. - * if it is high, it means that the caller is slowing down on calling {@link #readNext()}. - * <li> `async_reader`/delay_until_promise_satisfied: opstats. total latency for the read requests. - * <li> `async_reader`/idle_reader_error: counter. the number idle reader errors. - * </ul> - */ -class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification { - static final Logger LOG = LoggerFactory.getLogger(BKAsyncLogReader.class); - - private static final Function1<List<LogRecordWithDLSN>, LogRecordWithDLSN> READ_NEXT_MAP_FUNCTION = - new AbstractFunction1<List<LogRecordWithDLSN>, LogRecordWithDLSN>() { - @Override - public LogRecordWithDLSN apply(List<LogRecordWithDLSN> records) { - return records.get(0); - } - }; - - private final String streamName; - protected final BKDistributedLogManager bkDistributedLogManager; - protected final BKLogReadHandler readHandler; - private final AtomicReference<Throwable> lastException = new AtomicReference<Throwable>(); - private final OrderedScheduler scheduler; - private final ConcurrentLinkedQueue<PendingReadRequest> pendingRequests = new ConcurrentLinkedQueue<PendingReadRequest>(); - private final Object scheduleLock = new Object(); - private final AtomicLong scheduleCount = new AtomicLong(0); - final private Stopwatch scheduleDelayStopwatch; - final private Stopwatch readNextDelayStopwatch; - private DLSN startDLSN; - private ReadAheadEntryReader readAheadReader = null; - private int lastPosition = 0; - private final boolean positionGapDetectionEnabled; - private final int idleErrorThresholdMillis; - final ScheduledFuture<?> idleReaderTimeoutTask; - private ScheduledFuture<?> backgroundScheduleTask = null; - // last process time - private final Stopwatch lastProcessTime; - - protected Promise<Void> closeFuture = null; - - private boolean lockStream = false; - - private final boolean returnEndOfStreamRecord; - - private final Runnable BACKGROUND_READ_SCHEDULER = new Runnable() { - @Override - public void run() { - synchronized (scheduleLock) { - backgroundScheduleTask = null; - } - scheduleBackgroundRead(); - } - }; - - // State - private Entry.Reader currentEntry = null; - private LogRecordWithDLSN nextRecord = null; - - // Failure Injector - private boolean disableProcessingReadRequests = false; - - // Stats - private final OpStatsLogger readNextExecTime; - private final OpStatsLogger delayUntilPromiseSatisfied; - private final OpStatsLogger timeBetweenReadNexts; - private final OpStatsLogger futureSetLatency; - private final OpStatsLogger scheduleLatency; - private final OpStatsLogger backgroundReaderRunTime; - private final Counter idleReaderCheckCount; - private final Counter idleReaderCheckIdleReadRequestCount; - private final Counter idleReaderCheckIdleReadAheadCount; - private final Counter idleReaderError; - - private class PendingReadRequest { - private final Stopwatch enqueueTime; - private final int numEntries; - private final List<LogRecordWithDLSN> records; - private final Promise<List<LogRecordWithDLSN>> promise; - private final long deadlineTime; - private final TimeUnit deadlineTimeUnit; - - PendingReadRequest(int numEntries, - long deadlineTime, - TimeUnit deadlineTimeUnit) { - this.numEntries = numEntries; - this.enqueueTime = Stopwatch.createStarted(); - // optimize the space usage for single read. - if (numEntries == 1) { - this.records = new ArrayList<LogRecordWithDLSN>(1); - } else { - this.records = new ArrayList<LogRecordWithDLSN>(); - } - this.promise = new Promise<List<LogRecordWithDLSN>>(); - this.deadlineTime = deadlineTime; - this.deadlineTimeUnit = deadlineTimeUnit; - } - - Promise<List<LogRecordWithDLSN>> getPromise() { - return promise; - } - - long elapsedSinceEnqueue(TimeUnit timeUnit) { - return enqueueTime.elapsed(timeUnit); - } - - void setException(Throwable throwable) { - Stopwatch stopwatch = Stopwatch.createStarted(); - if (promise.updateIfEmpty(new Throw<List<LogRecordWithDLSN>>(throwable))) { - futureSetLatency.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); - delayUntilPromiseSatisfied.registerFailedEvent(enqueueTime.elapsed(TimeUnit.MICROSECONDS)); - } - } - - boolean hasReadRecords() { - return records.size() > 0; - } - - boolean hasReadEnoughRecords() { - return records.size() >= numEntries; - } - - long getRemainingWaitTime() { - if (deadlineTime <= 0L) { - return 0L; - } - return deadlineTime - elapsedSinceEnqueue(deadlineTimeUnit); - } - - void addRecord(LogRecordWithDLSN record) { - records.add(record); - } - - void complete() { - if (LOG.isTraceEnabled()) { - LOG.trace("{} : Satisfied promise with {} records", readHandler.getFullyQualifiedName(), records.size()); - } - delayUntilPromiseSatisfied.registerSuccessfulEvent(enqueueTime.stop().elapsed(TimeUnit.MICROSECONDS)); - Stopwatch stopwatch = Stopwatch.createStarted(); - promise.setValue(records); - futureSetLatency.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); - } - } - - BKAsyncLogReader(BKDistributedLogManager bkdlm, - OrderedScheduler scheduler, - DLSN startDLSN, - Optional<String> subscriberId, - boolean returnEndOfStreamRecord, - StatsLogger statsLogger) { - this.streamName = bkdlm.getStreamName(); - this.bkDistributedLogManager = bkdlm; - this.scheduler = scheduler; - this.readHandler = bkDistributedLogManager.createReadHandler(subscriberId, - this, true); - LOG.debug("Starting async reader at {}", startDLSN); - this.startDLSN = startDLSN; - this.scheduleDelayStopwatch = Stopwatch.createUnstarted(); - this.readNextDelayStopwatch = Stopwatch.createStarted(); - this.positionGapDetectionEnabled = bkdlm.getConf().getPositionGapDetectionEnabled(); - this.idleErrorThresholdMillis = bkdlm.getConf().getReaderIdleErrorThresholdMillis(); - this.returnEndOfStreamRecord = returnEndOfStreamRecord; - - // Stats - StatsLogger asyncReaderStatsLogger = statsLogger.scope("async_reader"); - futureSetLatency = asyncReaderStatsLogger.getOpStatsLogger("future_set"); - scheduleLatency = asyncReaderStatsLogger.getOpStatsLogger("schedule"); - backgroundReaderRunTime = asyncReaderStatsLogger.getOpStatsLogger("background_read"); - readNextExecTime = asyncReaderStatsLogger.getOpStatsLogger("read_next_exec"); - timeBetweenReadNexts = asyncReaderStatsLogger.getOpStatsLogger("time_between_read_next"); - delayUntilPromiseSatisfied = asyncReaderStatsLogger.getOpStatsLogger("delay_until_promise_satisfied"); - idleReaderError = asyncReaderStatsLogger.getCounter("idle_reader_error"); - idleReaderCheckCount = asyncReaderStatsLogger.getCounter("idle_reader_check_total"); - idleReaderCheckIdleReadRequestCount = asyncReaderStatsLogger.getCounter("idle_reader_check_idle_read_requests"); - idleReaderCheckIdleReadAheadCount = asyncReaderStatsLogger.getCounter("idle_reader_check_idle_readahead"); - - // Lock the stream if requested. The lock will be released when the reader is closed. - this.lockStream = false; - this.idleReaderTimeoutTask = scheduleIdleReaderTaskIfNecessary(); - this.lastProcessTime = Stopwatch.createStarted(); - } - - private ScheduledFuture<?> scheduleIdleReaderTaskIfNecessary() { - if (idleErrorThresholdMillis < Integer.MAX_VALUE) { - // Dont run the task more than once every seconds (for sanity) - long period = Math.max(idleErrorThresholdMillis / 10, 1000); - // Except when idle reader threshold is less than a second (tests?) - period = Math.min(period, idleErrorThresholdMillis / 5); - - return scheduler.scheduleAtFixedRate(streamName, new Runnable() { - @Override - public void run() { - PendingReadRequest nextRequest = pendingRequests.peek(); - - idleReaderCheckCount.inc(); - if (null == nextRequest) { - return; - } - - idleReaderCheckIdleReadRequestCount.inc(); - if (nextRequest.elapsedSinceEnqueue(TimeUnit.MILLISECONDS) < idleErrorThresholdMillis) { - return; - } - - ReadAheadEntryReader readAheadReader = getReadAheadReader(); - - // read request has been idle - // - cache has records but read request are idle, - // that means notification was missed between readahead and reader. - // - cache is empty and readahead is idle (no records added for a long time) - idleReaderCheckIdleReadAheadCount.inc(); - try { - if (null == readAheadReader || (!hasMoreRecords() && - readAheadReader.isReaderIdle(idleErrorThresholdMillis, TimeUnit.MILLISECONDS))) { - markReaderAsIdle(); - return; - } else if (lastProcessTime.elapsed(TimeUnit.MILLISECONDS) > idleErrorThresholdMillis) { - markReaderAsIdle();; - } - } catch (IOException e) { - setLastException(e); - return; - } - } - }, period, period, TimeUnit.MILLISECONDS); - } - return null; - } - - synchronized ReadAheadEntryReader getReadAheadReader() { - return readAheadReader; - } - - void cancelIdleReaderTask() { - // Do this after we have checked that the reader was not previously closed - try { - if (null != idleReaderTimeoutTask) { - idleReaderTimeoutTask.cancel(true); - } - } catch (Exception exc) { - LOG.info("{}: Failed to cancel the background idle reader timeout task", readHandler.getFullyQualifiedName()); - } - } - - private void markReaderAsIdle() { - idleReaderError.inc(); - IdleReaderException ire = new IdleReaderException("Reader on stream " - + readHandler.getFullyQualifiedName() - + " is idle for " + idleErrorThresholdMillis +" ms"); - setLastException(ire); - // cancel all pending reads directly rather than notifying on error - // because idle reader could happen on idle read requests that usually means something wrong - // in scheduling reads - cancelAllPendingReads(ire); - } - - protected synchronized void setStartDLSN(DLSN fromDLSN) throws UnexpectedException { - if (null != readAheadReader) { - throw new UnexpectedException("Could't reset from dlsn after reader already starts reading."); - } - startDLSN = fromDLSN; - } - - @VisibleForTesting - public synchronized DLSN getStartDLSN() { - return startDLSN; - } - - public Future<Void> lockStream() { - this.lockStream = true; - return readHandler.lockStream(); - } - - private boolean checkClosedOrInError(String operation) { - if (null == lastException.get()) { - try { - if (null != readHandler && null != getReadAheadReader()) { - getReadAheadReader().checkLastException(); - } - - bkDistributedLogManager.checkClosedOrInError(operation); - } catch (IOException exc) { - setLastException(exc); - } - } - - if (lockStream) { - try { - readHandler.checkReadLock(); - } catch (IOException ex) { - setLastException(ex); - } - } - - if (null != lastException.get()) { - LOG.trace("Cancelling pending reads"); - cancelAllPendingReads(lastException.get()); - return true; - } - - return false; - } - - private void setLastException(IOException exc) { - lastException.compareAndSet(null, exc); - } - - @Override - public String getStreamName() { - return streamName; - } - - /** - * @return A promise that when satisfied will contain the Log Record with its DLSN. - */ - @Override - public synchronized Future<LogRecordWithDLSN> readNext() { - return readInternal(1, 0, TimeUnit.MILLISECONDS).map(READ_NEXT_MAP_FUNCTION); - } - - public synchronized Future<List<LogRecordWithDLSN>> readBulk(int numEntries) { - return readInternal(numEntries, 0, TimeUnit.MILLISECONDS); - } - - @Override - public synchronized Future<List<LogRecordWithDLSN>> readBulk(int numEntries, - long waitTime, - TimeUnit timeUnit) { - return readInternal(numEntries, waitTime, timeUnit); - } - - /** - * Read up to <i>numEntries</i> entries. The future will be satisfied when any number of entries are - * ready (1 to <i>numEntries</i>). - * - * @param numEntries - * num entries to read - * @return A promise that satisfied with a non-empty list of log records with their DLSN. - */ - private synchronized Future<List<LogRecordWithDLSN>> readInternal(int numEntries, - long deadlineTime, - TimeUnit deadlineTimeUnit) { - timeBetweenReadNexts.registerSuccessfulEvent(readNextDelayStopwatch.elapsed(TimeUnit.MICROSECONDS)); - readNextDelayStopwatch.reset().start(); - final PendingReadRequest readRequest = new PendingReadRequest(numEntries, deadlineTime, deadlineTimeUnit); - - if (null == readAheadReader) { - final ReadAheadEntryReader readAheadEntryReader = this.readAheadReader = new ReadAheadEntryReader( - getStreamName(), - getStartDLSN(), - bkDistributedLogManager.getConf(), - readHandler, - bkDistributedLogManager.getReaderEntryStore(), - bkDistributedLogManager.getScheduler(), - Ticker.systemTicker(), - bkDistributedLogManager.alertStatsLogger); - readHandler.checkLogStreamExists().addEventListener(new FutureEventListener<Void>() { - @Override - public void onSuccess(Void value) { - try { - readHandler.registerListener(readAheadEntryReader); - readHandler.asyncStartFetchLogSegments() - .map(new AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() { - @Override - public BoxedUnit apply(Versioned<List<LogSegmentMetadata>> logSegments) { - readAheadEntryReader.addStateChangeNotification(BKAsyncLogReader.this); - readAheadEntryReader.start(logSegments.getValue()); - return BoxedUnit.UNIT; - } - }); - } catch (Exception exc) { - notifyOnError(exc); - } - } - - @Override - public void onFailure(Throwable cause) { - notifyOnError(cause); - } - }); - } - - if (checkClosedOrInError("readNext")) { - readRequest.setException(lastException.get()); - } else { - boolean queueEmpty = pendingRequests.isEmpty(); - pendingRequests.add(readRequest); - - if (queueEmpty) { - scheduleBackgroundRead(); - } - } - - readNextExecTime.registerSuccessfulEvent(readNextDelayStopwatch.elapsed(TimeUnit.MICROSECONDS)); - readNextDelayStopwatch.reset().start(); - - return readRequest.getPromise(); - } - - public synchronized void scheduleBackgroundRead() { - // if the reader is already closed, we don't need to schedule background read again. - if (null != closeFuture) { - return; - } - - long prevCount = scheduleCount.getAndIncrement(); - if (0 == prevCount) { - scheduleDelayStopwatch.reset().start(); - scheduler.submit(streamName, this); - } - } - - @Override - public Future<Void> asyncClose() { - // Cancel the idle reader timeout task, interrupting if necessary - ReadCancelledException exception; - Promise<Void> closePromise; - synchronized (this) { - if (null != closeFuture) { - return closeFuture; - } - closePromise = closeFuture = new Promise<Void>(); - exception = new ReadCancelledException(readHandler.getFullyQualifiedName(), "Reader was closed"); - setLastException(exception); - } - - // Do this after we have checked that the reader was not previously closed - cancelIdleReaderTask(); - - synchronized (scheduleLock) { - if (null != backgroundScheduleTask) { - backgroundScheduleTask.cancel(true); - } - } - - cancelAllPendingReads(exception); - - ReadAheadEntryReader readAheadReader = getReadAheadReader(); - if (null != readAheadReader) { - readHandler.unregisterListener(readAheadReader); - readAheadReader.removeStateChangeNotification(this); - } - Utils.closeSequence(bkDistributedLogManager.getScheduler(), true, - readAheadReader, - readHandler - ).proxyTo(closePromise); - return closePromise; - } - - private void cancelAllPendingReads(Throwable throwExc) { - for (PendingReadRequest promise : pendingRequests) { - promise.setException(throwExc); - } - pendingRequests.clear(); - } - - synchronized boolean hasMoreRecords() throws IOException { - if (null == readAheadReader) { - return false; - } - if (readAheadReader.getNumCachedEntries() > 0 || null != nextRecord) { - return true; - } else if (null != currentEntry) { - nextRecord = currentEntry.nextRecord(); - return null != nextRecord; - } - return false; - } - - private synchronized LogRecordWithDLSN readNextRecord() throws IOException { - if (null == readAheadReader) { - return null; - } - if (null == currentEntry) { - currentEntry = readAheadReader.getNextReadAheadEntry(0L, TimeUnit.MILLISECONDS); - // no entry after reading from read ahead then return null - if (null == currentEntry) { - return null; - } - } - - LogRecordWithDLSN recordToReturn; - if (null == nextRecord) { - nextRecord = currentEntry.nextRecord(); - // no more records in current entry - if (null == nextRecord) { - currentEntry = null; - return readNextRecord(); - } - } - - // found a record to return and prefetch the next one - recordToReturn = nextRecord; - nextRecord = currentEntry.nextRecord(); - return recordToReturn; - } - - @Override - public void run() { - synchronized(scheduleLock) { - if (scheduleDelayStopwatch.isRunning()) { - scheduleLatency.registerSuccessfulEvent(scheduleDelayStopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); - } - - Stopwatch runTime = Stopwatch.createStarted(); - int iterations = 0; - long scheduleCountLocal = scheduleCount.get(); - LOG.debug("{}: Scheduled Background Reader", readHandler.getFullyQualifiedName()); - while(true) { - if (LOG.isTraceEnabled()) { - LOG.trace("{}: Executing Iteration: {}", readHandler.getFullyQualifiedName(), iterations++); - } - - PendingReadRequest nextRequest = null; - synchronized(this) { - nextRequest = pendingRequests.peek(); - - // Queue is empty, nothing to read, return - if (null == nextRequest) { - LOG.trace("{}: Queue Empty waiting for Input", readHandler.getFullyQualifiedName()); - scheduleCount.set(0); - backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS)); - return; - } - - if (disableProcessingReadRequests) { - LOG.info("Reader of {} is forced to stop processing read requests", readHandler.getFullyQualifiedName()); - return; - } - } - lastProcessTime.reset().start(); - - // If the oldest pending promise is interrupted then we must mark - // the reader in error and abort all pending reads since we dont - // know the last consumed read - if (null == lastException.get()) { - if (nextRequest.getPromise().isInterrupted().isDefined()) { - setLastException(new DLInterruptedException("Interrupted on reading " + readHandler.getFullyQualifiedName() + " : ", - nextRequest.getPromise().isInterrupted().get())); - } - } - - if (checkClosedOrInError("readNext")) { - if (!(lastException.get().getCause() instanceof LogNotFoundException)) { - LOG.warn("{}: Exception", readHandler.getFullyQualifiedName(), lastException.get()); - } - backgroundReaderRunTime.registerFailedEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS)); - return; - } - - try { - // Fail 10% of the requests when asked to simulate errors - if (bkDistributedLogManager.getFailureInjector().shouldInjectErrors()) { - throw new IOException("Reader Simulated Exception"); - } - LogRecordWithDLSN record; - while (!nextRequest.hasReadEnoughRecords()) { - // read single record - do { - record = readNextRecord(); - } while (null != record && (record.isControl() || (record.getDlsn().compareTo(getStartDLSN()) < 0))); - if (null == record) { - break; - } else { - if (record.isEndOfStream() && !returnEndOfStreamRecord) { - setLastException(new EndOfStreamException("End of Stream Reached for " - + readHandler.getFullyQualifiedName())); - break; - } - - // gap detection - if (recordPositionsContainsGap(record, lastPosition)) { - bkDistributedLogManager.raiseAlert("Gap detected between records at record = {}", record); - if (positionGapDetectionEnabled) { - throw new DLIllegalStateException("Gap detected between records at record = " + record); - } - } - lastPosition = record.getLastPositionWithinLogSegment(); - - nextRequest.addRecord(record); - } - }; - } catch (IOException exc) { - setLastException(exc); - if (!(exc instanceof LogNotFoundException)) { - LOG.warn("{} : read with skip Exception", readHandler.getFullyQualifiedName(), lastException.get()); - } - continue; - } - - if (nextRequest.hasReadRecords()) { - long remainingWaitTime = nextRequest.getRemainingWaitTime(); - if (remainingWaitTime > 0 && !nextRequest.hasReadEnoughRecords()) { - backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS)); - scheduleDelayStopwatch.reset().start(); - scheduleCount.set(0); - // the request could still wait for more records - backgroundScheduleTask = scheduler.schedule( - streamName, - BACKGROUND_READ_SCHEDULER, - remainingWaitTime, - nextRequest.deadlineTimeUnit); - return; - } - - PendingReadRequest request = pendingRequests.poll(); - if (null != request && nextRequest == request) { - request.complete(); - if (null != backgroundScheduleTask) { - backgroundScheduleTask.cancel(true); - backgroundScheduleTask = null; - } - } else { - DLIllegalStateException ise = new DLIllegalStateException("Unexpected condition at dlsn = " - + nextRequest.records.get(0).getDlsn()); - nextRequest.setException(ise); - if (null != request) { - request.setException(ise); - } - // We should never get here as we should have exited the loop if - // pendingRequests were empty - bkDistributedLogManager.raiseAlert("Unexpected condition at dlsn = {}", - nextRequest.records.get(0).getDlsn()); - setLastException(ise); - } - } else { - if (0 == scheduleCountLocal) { - LOG.trace("Schedule count dropping to zero", lastException.get()); - backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS)); - return; - } - scheduleCountLocal = scheduleCount.decrementAndGet(); - } - } - } - } - - private boolean recordPositionsContainsGap(LogRecordWithDLSN record, long lastPosition) { - final boolean firstLogRecord = (1 == record.getPositionWithinLogSegment()); - final boolean endOfStreamRecord = record.isEndOfStream(); - final boolean emptyLogSegment = (0 == lastPosition); - final boolean positionIncreasedByOne = (record.getPositionWithinLogSegment() == (lastPosition + 1)); - - return !firstLogRecord && !endOfStreamRecord && !emptyLogSegment && - !positionIncreasedByOne; - } - - /** - * Triggered when the background activity encounters an exception - */ - @Override - public void notifyOnError(Throwable cause) { - if (cause instanceof IOException) { - setLastException((IOException) cause); - } else { - setLastException(new IOException(cause)); - } - scheduleBackgroundRead(); - } - - /** - * Triggered when the background activity completes an operation - */ - @Override - public void notifyOnOperationComplete() { - scheduleBackgroundRead(); - } - - @VisibleForTesting - void simulateErrors() { - bkDistributedLogManager.getFailureInjector().injectErrors(true); - } - - @VisibleForTesting - synchronized void disableReadAheadLogSegmentsNotification() { - readHandler.disableReadAheadLogSegmentsNotification(); - } - - @VisibleForTesting - synchronized void disableProcessingReadRequests() { - disableProcessingReadRequests = true; - } -} -
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java deleted file mode 100644 index 9432e8a..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java +++ /dev/null @@ -1,559 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import com.google.common.base.Stopwatch; -import com.google.common.annotations.VisibleForTesting; -import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; -import com.twitter.distributedlog.exceptions.StreamNotReadyException; -import com.twitter.distributedlog.exceptions.WriteCancelledException; -import com.twitter.distributedlog.exceptions.WriteException; -import com.twitter.distributedlog.feature.CoreFeatureKeys; -import com.twitter.distributedlog.util.FailpointUtils; -import com.twitter.distributedlog.util.FutureUtils; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; -import com.twitter.util.Try; -import org.apache.bookkeeper.feature.Feature; -import org.apache.bookkeeper.feature.FeatureProvider; -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.Function1; -import scala.Option; -import scala.runtime.AbstractFunction1; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; - -/** - * BookKeeper based {@link AsyncLogWriter} implementation. - * - * <h3>Metrics</h3> - * All the metrics are exposed under `log_writer`. - * <ul> - * <li> `log_writer/write`: opstats. latency characteristics about the time that write operations spent. - * <li> `log_writer/bulk_write`: opstats. latency characteristics about the time that bulk_write - * operations spent. - * are pending in the queue for long time due to log segment rolling. - * <li> `log_writer/get_writer`: opstats. the time spent on getting the writer. it could spike when there - * is log segment rolling happened during getting the writer. it is a good stat to look into when the latency - * is caused by queuing time. - * <li> `log_writer/pending_request_dispatch`: counter. the number of queued operations that are dispatched - * after log segment is rolled. it is an metric on measuring how many operations has been queued because of - * log segment rolling. - * </ul> - * See {@link BKLogSegmentWriter} for segment writer stats. - */ -public class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWriter { - - static final Logger LOG = LoggerFactory.getLogger(BKAsyncLogWriter.class); - - static Function1<List<LogSegmentMetadata>, Boolean> TruncationResultConverter = - new AbstractFunction1<List<LogSegmentMetadata>, Boolean>() { - @Override - public Boolean apply(List<LogSegmentMetadata> segments) { - return true; - } - }; - - // Records pending for roll log segment. - class PendingLogRecord implements FutureEventListener<DLSN> { - - final LogRecord record; - final Promise<DLSN> promise; - final boolean flush; - - PendingLogRecord(LogRecord record, boolean flush) { - this.record = record; - this.promise = new Promise<DLSN>(); - this.flush = flush; - } - - @Override - public void onSuccess(DLSN value) { - promise.setValue(value); - } - - @Override - public void onFailure(Throwable cause) { - promise.setException(cause); - encounteredError = true; - } - } - - /** - * Last pending record in current log segment. After it is satisified, it would - * roll log segment. - * - * This implementation is based on the assumption that all future satisified in same - * order future pool. - */ - class LastPendingLogRecord extends PendingLogRecord { - - LastPendingLogRecord(LogRecord record, boolean flush) { - super(record, flush); - } - - @Override - public void onSuccess(DLSN value) { - super.onSuccess(value); - // roll log segment and issue all pending requests. - rollLogSegmentAndIssuePendingRequests(record.getTransactionId()); - } - - @Override - public void onFailure(Throwable cause) { - super.onFailure(cause); - // error out pending requests. - errorOutPendingRequestsAndWriter(cause); - } - } - - private final boolean streamFailFast; - private final boolean disableRollOnSegmentError; - private LinkedList<PendingLogRecord> pendingRequests = null; - private volatile boolean encounteredError = false; - private Promise<BKLogSegmentWriter> rollingFuture = null; - private long lastTxId = DistributedLogConstants.INVALID_TXID; - - private final StatsLogger statsLogger; - private final OpStatsLogger writeOpStatsLogger; - private final OpStatsLogger markEndOfStreamOpStatsLogger; - private final OpStatsLogger bulkWriteOpStatsLogger; - private final OpStatsLogger getWriterOpStatsLogger; - private final Counter pendingRequestDispatch; - - private final Feature disableLogSegmentRollingFeature; - - BKAsyncLogWriter(DistributedLogConfiguration conf, - DynamicDistributedLogConfiguration dynConf, - BKDistributedLogManager bkdlm, - BKLogWriteHandler writeHandler, /** log writer owns the handler **/ - FeatureProvider featureProvider, - StatsLogger dlmStatsLogger) { - super(conf, dynConf, bkdlm); - this.writeHandler = writeHandler; - this.streamFailFast = conf.getFailFastOnStreamNotReady(); - this.disableRollOnSegmentError = conf.getDisableRollingOnLogSegmentError(); - - // features - disableLogSegmentRollingFeature = featureProvider.getFeature(CoreFeatureKeys.DISABLE_LOGSEGMENT_ROLLING.name().toLowerCase()); - // stats - this.statsLogger = dlmStatsLogger.scope("log_writer"); - this.writeOpStatsLogger = statsLogger.getOpStatsLogger("write"); - this.markEndOfStreamOpStatsLogger = statsLogger.getOpStatsLogger("mark_end_of_stream"); - this.bulkWriteOpStatsLogger = statsLogger.getOpStatsLogger("bulk_write"); - this.getWriterOpStatsLogger = statsLogger.getOpStatsLogger("get_writer"); - this.pendingRequestDispatch = statsLogger.getCounter("pending_request_dispatch"); - } - - @VisibleForTesting - synchronized void setLastTxId(long txId) { - lastTxId = Math.max(lastTxId, txId); - } - - @Override - public synchronized long getLastTxId() { - return lastTxId; - } - - /** - * Write a log record as control record. The method will be used by Monitor Service to enforce a new inprogress segment. - * - * @param record - * log record - * @return future of the write - */ - public Future<DLSN> writeControlRecord(final LogRecord record) { - record.setControl(); - return write(record); - } - - private BKLogSegmentWriter getCachedLogSegmentWriter() throws WriteException { - if (encounteredError) { - throw new WriteException(bkDistributedLogManager.getStreamName(), - "writer has been closed due to error."); - } - BKLogSegmentWriter segmentWriter = getCachedLogWriter(); - if (null != segmentWriter - && segmentWriter.isLogSegmentInError() - && !disableRollOnSegmentError) { - return null; - } else { - return segmentWriter; - } - } - - private Future<BKLogSegmentWriter> getLogSegmentWriter(long firstTxid, - boolean bestEffort, - boolean rollLog, - boolean allowMaxTxID) { - Stopwatch stopwatch = Stopwatch.createStarted(); - return FutureUtils.stats( - doGetLogSegmentWriter(firstTxid, bestEffort, rollLog, allowMaxTxID), - getWriterOpStatsLogger, - stopwatch); - } - - private Future<BKLogSegmentWriter> doGetLogSegmentWriter(final long firstTxid, - final boolean bestEffort, - final boolean rollLog, - final boolean allowMaxTxID) { - if (encounteredError) { - return Future.exception(new WriteException(bkDistributedLogManager.getStreamName(), - "writer has been closed due to error.")); - } - Future<BKLogSegmentWriter> writerFuture = asyncGetLedgerWriter(!disableRollOnSegmentError); - if (null == writerFuture) { - return rollLogSegmentIfNecessary(null, firstTxid, bestEffort, allowMaxTxID); - } else if (rollLog) { - return writerFuture.flatMap(new AbstractFunction1<BKLogSegmentWriter, Future<BKLogSegmentWriter>>() { - @Override - public Future<BKLogSegmentWriter> apply(BKLogSegmentWriter writer) { - return rollLogSegmentIfNecessary(writer, firstTxid, bestEffort, allowMaxTxID); - } - }); - } else { - return writerFuture; - } - } - - /** - * We write end of stream marker by writing a record with MAX_TXID, so we need to allow using - * max txid when rolling for this case only. - */ - private Future<BKLogSegmentWriter> getLogSegmentWriterForEndOfStream() { - return getLogSegmentWriter(DistributedLogConstants.MAX_TXID, - false /* bestEffort */, - false /* roll log */, - true /* allow max txid */); - } - - private Future<BKLogSegmentWriter> getLogSegmentWriter(long firstTxid, - boolean bestEffort, - boolean rollLog) { - return getLogSegmentWriter(firstTxid, bestEffort, rollLog, false /* allow max txid */); - } - - Future<DLSN> queueRequest(LogRecord record, boolean flush) { - PendingLogRecord pendingLogRecord = new PendingLogRecord(record, flush); - pendingRequests.add(pendingLogRecord); - return pendingLogRecord.promise; - } - - boolean shouldRollLog(BKLogSegmentWriter w) { - try { - return null == w || - (!disableLogSegmentRollingFeature.isAvailable() && - shouldStartNewSegment(w)); - } catch (IOException ioe) { - return false; - } - } - - void startQueueingRequests() { - assert(null == pendingRequests && null == rollingFuture); - pendingRequests = new LinkedList<PendingLogRecord>(); - rollingFuture = new Promise<BKLogSegmentWriter>(); - } - - // for ordering guarantee, we shouldn't send requests to next log segments until - // previous log segment is done. - private synchronized Future<DLSN> asyncWrite(final LogRecord record, - boolean flush) { - // The passed in writer may be stale since we acquire the writer outside of sync - // lock. If we recently rolled and the new writer is cached, use that instead. - Future<DLSN> result = null; - BKLogSegmentWriter w; - try { - w = getCachedLogSegmentWriter(); - } catch (WriteException we) { - return Future.exception(we); - } - if (null != rollingFuture) { - if (streamFailFast) { - result = Future.exception(new StreamNotReadyException("Rolling log segment")); - } else { - result = queueRequest(record, flush); - } - } else if (shouldRollLog(w)) { - // insert a last record, so when it called back, we will trigger a log segment rolling - startQueueingRequests(); - if (null != w) { - LastPendingLogRecord lastLogRecordInCurrentSegment = new LastPendingLogRecord(record, flush); - w.asyncWrite(record, true).addEventListener(lastLogRecordInCurrentSegment); - result = lastLogRecordInCurrentSegment.promise; - } else { // no log segment yet. roll the log segment and issue pending requests. - result = queueRequest(record, flush); - rollLogSegmentAndIssuePendingRequests(record.getTransactionId()); - } - } else { - result = w.asyncWrite(record, flush); - } - // use map here rather than onSuccess because we want lastTxId to be updated before - // satisfying the future - return result.map(new AbstractFunction1<DLSN, DLSN>() { - @Override - public DLSN apply(DLSN dlsn) { - setLastTxId(record.getTransactionId()); - return dlsn; - } - }); - } - - private List<Future<DLSN>> asyncWriteBulk(List<LogRecord> records) { - final ArrayList<Future<DLSN>> results = new ArrayList<Future<DLSN>>(records.size()); - Iterator<LogRecord> iterator = records.iterator(); - while (iterator.hasNext()) { - LogRecord record = iterator.next(); - Future<DLSN> future = asyncWrite(record, !iterator.hasNext()); - results.add(future); - - // Abort early if an individual write has already failed. - Option<Try<DLSN>> result = future.poll(); - if (result.isDefined() && result.get().isThrow()) { - break; - } - } - if (records.size() > results.size()) { - appendCancelledFutures(results, records.size() - results.size()); - } - return results; - } - - private void appendCancelledFutures(List<Future<DLSN>> futures, int numToAdd) { - final WriteCancelledException cre = - new WriteCancelledException(getStreamName()); - for (int i = 0; i < numToAdd; i++) { - Future<DLSN> cancelledFuture = Future.exception(cre); - futures.add(cancelledFuture); - } - } - - private void rollLogSegmentAndIssuePendingRequests(final long firstTxId) { - getLogSegmentWriter(firstTxId, true, true) - .addEventListener(new FutureEventListener<BKLogSegmentWriter>() { - @Override - public void onSuccess(BKLogSegmentWriter writer) { - try { - synchronized (BKAsyncLogWriter.this) { - for (PendingLogRecord pendingLogRecord : pendingRequests) { - FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_LogWriterIssuePending); - writer.asyncWrite(pendingLogRecord.record, pendingLogRecord.flush) - .addEventListener(pendingLogRecord); - } - // if there are no records in the pending queue, let's write a control record - // so that when a new log segment is rolled, a control record will be added and - // the corresponding bookies would be able to create its ledger. - if (pendingRequests.isEmpty()) { - LogRecord controlRecord = new LogRecord(firstTxId, - DistributedLogConstants.CONTROL_RECORD_CONTENT); - controlRecord.setControl(); - PendingLogRecord controlReq = new PendingLogRecord(controlRecord, false); - writer.asyncWrite(controlReq.record, controlReq.flush) - .addEventListener(controlReq); - } - if (null != rollingFuture) { - FutureUtils.setValue(rollingFuture, writer); - } - rollingFuture = null; - pendingRequestDispatch.add(pendingRequests.size()); - pendingRequests = null; - } - } catch (IOException ioe) { - errorOutPendingRequestsAndWriter(ioe); - } - } - @Override - public void onFailure(Throwable cause) { - errorOutPendingRequestsAndWriter(cause); - } - }); - } - - @VisibleForTesting - void errorOutPendingRequests(Throwable cause, boolean errorOutWriter) { - final List<PendingLogRecord> pendingRequestsSnapshot; - synchronized (this) { - pendingRequestsSnapshot = pendingRequests; - encounteredError = errorOutWriter; - pendingRequests = null; - if (null != rollingFuture) { - FutureUtils.setException(rollingFuture, cause); - } - rollingFuture = null; - } - - pendingRequestDispatch.add(pendingRequestsSnapshot.size()); - - // After erroring out the writer above, no more requests - // will be enqueued to pendingRequests - for (PendingLogRecord pendingLogRecord : pendingRequestsSnapshot) { - pendingLogRecord.promise.setException(cause); - } - } - - void errorOutPendingRequestsAndWriter(Throwable cause) { - errorOutPendingRequests(cause, true /* error out writer */); - } - - /** - * Write a log record to the stream. - * - * @param record single log record - */ - @Override - public Future<DLSN> write(final LogRecord record) { - final Stopwatch stopwatch = Stopwatch.createStarted(); - return FutureUtils.stats( - asyncWrite(record, true), - writeOpStatsLogger, - stopwatch); - } - - /** - * Write many log records to the stream. The return type here is unfortunate but its a direct result - * of having to combine FuturePool and the asyncWriteBulk method which returns a future as well. The - * problem is the List that asyncWriteBulk returns can't be materialized until getLogSegmentWriter - * completes, so it has to be wrapped in a future itself. - * - * @param records list of records - */ - @Override - public Future<List<Future<DLSN>>> writeBulk(final List<LogRecord> records) { - final Stopwatch stopwatch = Stopwatch.createStarted(); - return FutureUtils.stats( - Future.value(asyncWriteBulk(records)), - bulkWriteOpStatsLogger, - stopwatch); - } - - @Override - public Future<Boolean> truncate(final DLSN dlsn) { - if (DLSN.InvalidDLSN == dlsn) { - return Future.value(false); - } - BKLogWriteHandler writeHandler; - try { - writeHandler = getWriteHandler(); - } catch (IOException e) { - return Future.exception(e); - } - return writeHandler.setLogSegmentsOlderThanDLSNTruncated(dlsn).map(TruncationResultConverter); - } - - Future<Long> flushAndCommit() { - Future<BKLogSegmentWriter> writerFuture; - synchronized (this) { - if (null != this.rollingFuture) { - writerFuture = this.rollingFuture; - } else { - writerFuture = getCachedLogWriterFuture(); - } - } - if (null == writerFuture) { - return Future.value(getLastTxId()); - } - return writerFuture.flatMap(new AbstractFunction1<BKLogSegmentWriter, Future<Long>>() { - @Override - public Future<Long> apply(BKLogSegmentWriter writer) { - return writer.flushAndCommit(); - } - }); - } - - Future<Long> markEndOfStream() { - final Stopwatch stopwatch = Stopwatch.createStarted(); - Future<BKLogSegmentWriter> logSegmentWriterFuture; - synchronized (this) { - logSegmentWriterFuture = this.rollingFuture; - } - if (null == logSegmentWriterFuture) { - logSegmentWriterFuture = getLogSegmentWriterForEndOfStream(); - } - - return FutureUtils.stats( - logSegmentWriterFuture.flatMap(new AbstractFunction1<BKLogSegmentWriter, Future<Long>>() { - @Override - public Future<Long> apply(BKLogSegmentWriter w) { - return w.markEndOfStream(); - } - }), - markEndOfStreamOpStatsLogger, - stopwatch); - } - - @Override - protected Future<Void> asyncCloseAndComplete() { - Future<BKLogSegmentWriter> logSegmentWriterFuture; - synchronized (this) { - logSegmentWriterFuture = this.rollingFuture; - } - - if (null == logSegmentWriterFuture) { - return super.asyncCloseAndComplete(); - } else { - return logSegmentWriterFuture.flatMap(new AbstractFunction1<BKLogSegmentWriter, Future<Void>>() { - @Override - public Future<Void> apply(BKLogSegmentWriter segmentWriter) { - return BKAsyncLogWriter.super.asyncCloseAndComplete(); - } - }); - } - } - - @Override - void closeAndComplete() throws IOException { - FutureUtils.result(asyncCloseAndComplete()); - } - - /** - * *TEMP HACK* - * Get the name of the stream this writer writes data to - */ - @Override - public String getStreamName() { - return bkDistributedLogManager.getStreamName(); - } - - @Override - public Future<Void> asyncAbort() { - Future<Void> result = super.asyncAbort(); - synchronized (this) { - if (pendingRequests != null) { - for (PendingLogRecord pendingLogRecord : pendingRequests) { - pendingLogRecord.promise.setException(new WriteException(bkDistributedLogManager.getStreamName(), - "abort wring: writer has been closed due to error.")); - } - } - } - return result; - } - - @Override - public String toString() { - return String.format("AsyncLogWriter:%s", getStreamName()); - } -}