http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadEntryReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadEntryReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadEntryReader.java new file mode 100644 index 0000000..94e618a --- /dev/null +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadEntryReader.java @@ -0,0 +1,966 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Stopwatch; +import com.google.common.base.Ticker; +import com.google.common.collect.Lists; +import com.twitter.distributedlog.callback.LogSegmentListener; +import com.twitter.distributedlog.exceptions.AlreadyTruncatedTransactionException; +import com.twitter.distributedlog.exceptions.DLIllegalStateException; +import com.twitter.distributedlog.exceptions.DLInterruptedException; +import com.twitter.distributedlog.exceptions.EndOfLogSegmentException; +import com.twitter.distributedlog.exceptions.LogNotFoundException; +import com.twitter.distributedlog.exceptions.UnexpectedException; +import com.twitter.distributedlog.io.AsyncCloseable; +import com.twitter.distributedlog.logsegment.LogSegmentEntryReader; +import com.twitter.distributedlog.logsegment.LogSegmentEntryStore; +import com.twitter.distributedlog.logsegment.LogSegmentFilter; +import com.twitter.distributedlog.util.OrderedScheduler; +import com.twitter.util.Function0; +import com.twitter.util.Future; +import com.twitter.util.FutureEventListener; +import com.twitter.util.Futures; +import com.twitter.util.Promise; +import org.apache.bookkeeper.stats.AlertStatsLogger; +import org.apache.bookkeeper.versioning.Versioned; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Function1; +import scala.runtime.AbstractFunction1; +import scala.runtime.BoxedUnit; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * New ReadAhead Reader that uses {@link com.twitter.distributedlog.logsegment.LogSegmentEntryReader}. + * + * NOTE: all the state changes happen in the same thread. All *unsafe* methods should be submitted to the order + * scheduler using stream name as the key. + */ +public class ReadAheadEntryReader implements + AsyncCloseable, + LogSegmentListener, + FutureEventListener<List<Entry.Reader>> { + + private static final Logger logger = LoggerFactory.getLogger(ReadAheadEntryReader.class); + + // + // Static Functions + // + + private static AbstractFunction1<LogSegmentEntryReader, BoxedUnit> START_READER_FUNC = new AbstractFunction1<LogSegmentEntryReader, BoxedUnit>() { + @Override + public BoxedUnit apply(LogSegmentEntryReader reader) { + reader.start(); + return BoxedUnit.UNIT; + } + }; + + // + // Internal Classes + // + + class SegmentReader implements FutureEventListener<LogSegmentEntryReader> { + + private LogSegmentMetadata metadata; + private final long startEntryId; + private Future<LogSegmentEntryReader> openFuture = null; + private LogSegmentEntryReader reader = null; + private boolean isStarted = false; + private boolean isClosed = false; + + SegmentReader(LogSegmentMetadata metadata, + long startEntryId) { + this.metadata = metadata; + this.startEntryId = startEntryId; + } + + synchronized LogSegmentEntryReader getEntryReader() { + return reader; + } + + synchronized boolean isBeyondLastAddConfirmed() { + return null != reader && reader.isBeyondLastAddConfirmed(); + } + + synchronized LogSegmentMetadata getSegment() { + return metadata; + } + + synchronized boolean isReaderOpen() { + return null != openFuture; + } + + synchronized void openReader() { + if (null != openFuture) { + return; + } + openFuture = entryStore.openReader(metadata, startEntryId).addEventListener(this); + } + + synchronized boolean isReaderStarted() { + return isStarted; + } + + synchronized void startRead() { + if (isStarted) { + return; + } + isStarted = true; + if (null != reader) { + reader.start(); + } else { + openFuture.onSuccess(START_READER_FUNC); + } + } + + synchronized Future<List<Entry.Reader>> readNext() { + if (null != reader) { + checkCatchingUpStatus(reader); + return reader.readNext(numReadAheadEntries); + } else { + return openFuture.flatMap(readFunc); + } + } + + synchronized void updateLogSegmentMetadata(final LogSegmentMetadata segment) { + if (null != reader) { + reader.onLogSegmentMetadataUpdated(segment); + this.metadata = segment; + } else { + openFuture.onSuccess(new AbstractFunction1<LogSegmentEntryReader, BoxedUnit>() { + @Override + public BoxedUnit apply(LogSegmentEntryReader reader) { + reader.onLogSegmentMetadataUpdated(segment); + synchronized (SegmentReader.this) { + SegmentReader.this.metadata = segment; + } + return BoxedUnit.UNIT; + } + }); + } + } + + @Override + synchronized public void onSuccess(LogSegmentEntryReader reader) { + this.reader = reader; + } + + @Override + public void onFailure(Throwable cause) { + // no-op, the failure will be propagated on first read. + } + + synchronized boolean isClosed() { + return isClosed; + } + + synchronized Future<Void> close() { + if (null == openFuture) { + return Future.Void(); + } + return openFuture.flatMap(new AbstractFunction1<LogSegmentEntryReader, Future<Void>>() { + @Override + public Future<Void> apply(LogSegmentEntryReader reader) { + return reader.asyncClose(); + } + }).ensure(new Function0<BoxedUnit>() { + @Override + public BoxedUnit apply() { + synchronized (SegmentReader.this) { + isClosed = true; + } + return null; + } + }); + } + } + + private class ReadEntriesFunc extends AbstractFunction1<LogSegmentEntryReader, Future<List<Entry.Reader>>> { + + private final int numEntries; + + ReadEntriesFunc(int numEntries) { + this.numEntries = numEntries; + } + + @Override + public Future<List<Entry.Reader>> apply(LogSegmentEntryReader reader) { + checkCatchingUpStatus(reader); + return reader.readNext(numEntries); + } + } + + private abstract class CloseableRunnable implements Runnable { + + @Override + public void run() { + synchronized (ReadAheadEntryReader.this) { + if (null != closePromise) { + return; + } + safeRun(); + } + + } + + abstract void safeRun(); + + } + + // + // Functions + // + private final Function1<LogSegmentEntryReader, Future<List<Entry.Reader>>> readFunc; + private final Function0<BoxedUnit> removeClosedSegmentReadersFunc = new Function0<BoxedUnit>() { + @Override + public BoxedUnit apply() { + removeClosedSegmentReaders(); + return BoxedUnit.UNIT; + } + }; + + // + // Resources + // + private final DistributedLogConfiguration conf; + private final BKLogReadHandler readHandler; + private final LogSegmentEntryStore entryStore; + private final OrderedScheduler scheduler; + + // + // Parameters + // + private final String streamName; + private final DLSN fromDLSN; + private final int maxCachedEntries; + private final int numReadAheadEntries; + private final int idleWarnThresholdMillis; + + // + // Cache + // + private final LinkedBlockingQueue<Entry.Reader> entryQueue; + + // + // State of the reader + // + + private boolean isInitialized; + private boolean readAheadPaused = false; + private Promise<Void> closePromise = null; + // segment readers + private long currentSegmentSequenceNumber; + private SegmentReader currentSegmentReader; + private SegmentReader nextSegmentReader; + private DLSN lastDLSN; + private final EntryPosition nextEntryPosition; + private volatile boolean isCatchingUp = true; + private final LinkedList<SegmentReader> segmentReaders; + private final LinkedList<SegmentReader> segmentReadersToClose; + // last exception that this reader encounters + private final AtomicReference<IOException> lastException = new AtomicReference<IOException>(null); + // last entry added time + private final Stopwatch lastEntryAddedTime; + // state change notification + private final CopyOnWriteArraySet<AsyncNotification> stateChangeNotifications = + new CopyOnWriteArraySet<AsyncNotification>(); + // idle reader check task + private final ScheduledFuture<?> idleReaderCheckTask; + + // + // Stats + // + private final AlertStatsLogger alertStatsLogger; + + public ReadAheadEntryReader(String streamName, + DLSN fromDLSN, + DistributedLogConfiguration conf, + BKLogReadHandler readHandler, + LogSegmentEntryStore entryStore, + OrderedScheduler scheduler, + Ticker ticker, + AlertStatsLogger alertStatsLogger) { + this.streamName = streamName; + this.fromDLSN = lastDLSN = fromDLSN; + this.nextEntryPosition = new EntryPosition( + fromDLSN.getLogSegmentSequenceNo(), + fromDLSN.getEntryId()); + this.conf = conf; + this.maxCachedEntries = conf.getReadAheadMaxRecords(); + this.numReadAheadEntries = conf.getReadAheadBatchSize(); + this.idleWarnThresholdMillis = conf.getReaderIdleWarnThresholdMillis(); + this.readHandler = readHandler; + this.entryStore = entryStore; + this.scheduler = scheduler; + this.readFunc = new ReadEntriesFunc(numReadAheadEntries); + this.alertStatsLogger = alertStatsLogger; + + // create the segment reader list + this.segmentReaders = new LinkedList<SegmentReader>(); + this.segmentReadersToClose = new LinkedList<SegmentReader>(); + // create the readahead entry queue + this.entryQueue = new LinkedBlockingQueue<Entry.Reader>(); + + // start the idle reader detection + lastEntryAddedTime = Stopwatch.createStarted(ticker); + // start the idle reader check task + idleReaderCheckTask = scheduleIdleReaderTaskIfNecessary(); + } + + private ScheduledFuture<?> scheduleIdleReaderTaskIfNecessary() { + if (idleWarnThresholdMillis < Integer.MAX_VALUE && idleWarnThresholdMillis > 0) { + return scheduler.scheduleAtFixedRate(streamName, new Runnable() { + @Override + public void run() { + if (!isReaderIdle(idleWarnThresholdMillis, TimeUnit.MILLISECONDS)) { + return; + } + // the readahead has been idle + unsafeCheckIfReadAheadIsIdle(); + } + }, idleWarnThresholdMillis, idleWarnThresholdMillis, TimeUnit.MILLISECONDS); + } + return null; + } + + private void unsafeCheckIfReadAheadIsIdle() { + boolean forceReadLogSegments = + (null == currentSegmentReader) || currentSegmentReader.isBeyondLastAddConfirmed(); + if (forceReadLogSegments) { + readHandler.readLogSegmentsFromStore( + LogSegmentMetadata.COMPARATOR, + LogSegmentFilter.DEFAULT_FILTER, + null + ).addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { + @Override + public void onFailure(Throwable cause) { + // do nothing here since it would be retried on next idle reader check task + } + + @Override + public void onSuccess(Versioned<List<LogSegmentMetadata>> segments) { + onSegmentsUpdated(segments.getValue()); + } + }); + } + } + + private void cancelIdleReaderTask() { + if (null != idleReaderCheckTask) { + idleReaderCheckTask.cancel(true); + } + } + + @VisibleForTesting + EntryPosition getNextEntryPosition() { + return nextEntryPosition; + } + + @VisibleForTesting + SegmentReader getCurrentSegmentReader() { + return currentSegmentReader; + } + + @VisibleForTesting + long getCurrentSegmentSequenceNumber() { + return currentSegmentSequenceNumber; + } + + @VisibleForTesting + SegmentReader getNextSegmentReader() { + return nextSegmentReader; + } + + @VisibleForTesting + LinkedList<SegmentReader> getSegmentReaders() { + return segmentReaders; + } + + @VisibleForTesting + boolean isInitialized() { + return isInitialized; + } + + private void orderedSubmit(Runnable runnable) { + synchronized (this) { + if (null != closePromise) { + return; + } + } + try { + scheduler.submit(streamName, runnable); + } catch (RejectedExecutionException ree) { + logger.debug("Failed to submit and execute an operation for readhead entry reader of {}", + streamName, ree); + } + } + + public void start(final List<LogSegmentMetadata> segmentList) { + logger.info("Starting the readahead entry reader for {} : segments = {}", + readHandler.getFullyQualifiedName(), segmentList); + processLogSegments(segmentList); + } + + private void removeClosedSegmentReaders() { + orderedSubmit(new CloseableRunnable() { + @Override + void safeRun() { + unsafeRemoveClosedSegmentReaders(); + } + }); + } + + private void unsafeRemoveClosedSegmentReaders() { + SegmentReader reader = segmentReadersToClose.peekFirst(); + while (null != reader) { + if (reader.isClosed()) { + segmentReadersToClose.pollFirst(); + reader = segmentReadersToClose.peekFirst(); + } else { + break; + } + } + } + + @Override + public Future<Void> asyncClose() { + final Promise<Void> closeFuture; + synchronized (this) { + if (null != closePromise) { + return closePromise; + } + closePromise = closeFuture = new Promise<Void>(); + } + + // cancel the idle reader task + cancelIdleReaderTask(); + + // use runnable here instead of CloseableRunnable, + // because we need this to be executed + try { + scheduler.submit(streamName, new Runnable() { + @Override + public void run() { + unsafeAsyncClose(closeFuture); + } + }); + } catch (RejectedExecutionException ree) { + logger.warn("Scheduler has been shutdown before closing the readahead entry reader for stream {}", + streamName, ree); + unsafeAsyncClose(closeFuture); + } + + return closeFuture; + } + + private void unsafeAsyncClose(Promise<Void> closePromise) { + List<Future<Void>> closeFutures = Lists.newArrayListWithExpectedSize( + segmentReaders.size() + segmentReadersToClose.size() + 1); + if (null != currentSegmentReader) { + segmentReadersToClose.add(currentSegmentReader); + } + if (null != nextSegmentReader) { + segmentReadersToClose.add(nextSegmentReader); + } + for (SegmentReader reader : segmentReaders) { + segmentReadersToClose.add(reader); + } + segmentReaders.clear(); + for (SegmentReader reader : segmentReadersToClose) { + closeFutures.add(reader.close()); + } + Futures.collect(closeFutures).proxyTo(closePromise); + } + + // + // Reader State Changes + // + + ReadAheadEntryReader addStateChangeNotification(AsyncNotification notification) { + this.stateChangeNotifications.add(notification); + return this; + } + + ReadAheadEntryReader removeStateChangeNotification(AsyncNotification notification) { + this.stateChangeNotifications.remove(notification); + return this; + } + + private void notifyStateChangeOnSuccess() { + for (AsyncNotification notification : stateChangeNotifications) { + notification.notifyOnOperationComplete(); + } + } + + private void notifyStateChangeOnFailure(Throwable cause) { + for (AsyncNotification notification : stateChangeNotifications) { + notification.notifyOnError(cause); + } + } + + void setLastException(IOException cause) { + if (!lastException.compareAndSet(null, cause)) { + return; + } + // the exception is set and notify the state change + notifyStateChangeOnFailure(cause); + } + + void checkLastException() throws IOException { + if (null != lastException.get()) { + throw lastException.get(); + } + } + + void checkCatchingUpStatus(LogSegmentEntryReader reader) { + if (reader.getSegment().isInProgress() + && isCatchingUp + && reader.hasCaughtUpOnInprogress()) { + logger.info("ReadAhead for {} is caught up at entry {} @ log segment {}.", + new Object[] { readHandler.getFullyQualifiedName(), + reader.getLastAddConfirmed(), reader.getSegment() }); + isCatchingUp = false; + } + } + + public boolean isReadAheadCaughtUp() { + return !isCatchingUp; + } + + // + // ReadAhead State Machine + // + + @Override + public void onSuccess(List<Entry.Reader> entries) { + lastEntryAddedTime.reset().start(); + for (Entry.Reader entry : entries) { + entryQueue.add(entry); + } + if (!entries.isEmpty()) { + Entry.Reader lastEntry = entries.get(entries.size() - 1); + nextEntryPosition.advance(lastEntry.getLSSN(), lastEntry.getEntryId() + 1); + } + // notify on data available + notifyStateChangeOnSuccess(); + if (entryQueue.size() >= maxCachedEntries) { + pauseReadAheadOnCacheFull(); + } else { + scheduleReadNext(); + } + } + + @Override + public void onFailure(Throwable cause) { + if (cause instanceof EndOfLogSegmentException) { + // we reach end of the log segment + moveToNextLogSegment(); + return; + } + if (cause instanceof IOException) { + setLastException((IOException) cause); + } else { + setLastException(new UnexpectedException("Unexpected non I/O exception", cause)); + } + } + + private synchronized void invokeReadAhead() { + if (readAheadPaused) { + scheduleReadNext(); + readAheadPaused = false; + } + } + + private synchronized void pauseReadAheadOnCacheFull() { + this.readAheadPaused = true; + if (!isCacheFull()) { + invokeReadAhead(); + } + } + + private synchronized void pauseReadAheadOnNoMoreLogSegments() { + this.readAheadPaused = true; + } + + // + // Cache Related Methods + // + + public Entry.Reader getNextReadAheadEntry(long waitTime, TimeUnit waitTimeUnit) throws IOException { + if (null != lastException.get()) { + throw lastException.get(); + } + Entry.Reader entry; + try { + entry = entryQueue.poll(waitTime, waitTimeUnit); + } catch (InterruptedException e) { + throw new DLInterruptedException("Interrupted on waiting next readahead entry : ", e); + } + try { + return entry; + } finally { + // resume readahead if the cache becomes empty + if (null != entry && !isCacheFull()) { + invokeReadAhead(); + } + } + } + + /** + * Return number cached entries. + * + * @return number cached entries. + */ + public int getNumCachedEntries() { + return entryQueue.size(); + } + + /** + * Return if the cache is full. + * + * @return true if the cache is full, otherwise false. + */ + public boolean isCacheFull() { + return getNumCachedEntries() >= maxCachedEntries; + } + + @VisibleForTesting + public boolean isCacheEmpty() { + return entryQueue.isEmpty(); + } + + /** + * Check whether the readahead becomes stall. + * + * @param idleReaderErrorThreshold idle reader error threshold + * @param timeUnit time unit of the idle reader error threshold + * @return true if the readahead becomes stall, otherwise false. + */ + public boolean isReaderIdle(int idleReaderErrorThreshold, TimeUnit timeUnit) { + return (lastEntryAddedTime.elapsed(timeUnit) > idleReaderErrorThreshold); + } + + // + // LogSegment Management + // + + void processLogSegments(final List<LogSegmentMetadata> segments) { + orderedSubmit(new CloseableRunnable() { + @Override + void safeRun() { + unsafeProcessLogSegments(segments); + } + }); + } + + private void unsafeProcessLogSegments(List<LogSegmentMetadata> segments) { + if (isInitialized) { + unsafeReinitializeLogSegments(segments); + } else { + unsafeInitializeLogSegments(segments); + } + } + + /** + * Update the log segment metadata. + * + * @param reader the reader to update the metadata + * @param newMetadata the new metadata received + * @return true if successfully, false on encountering errors + */ + private boolean updateLogSegmentMetadata(SegmentReader reader, + LogSegmentMetadata newMetadata) { + if (reader.getSegment().getLogSegmentSequenceNumber() != newMetadata.getLogSegmentSequenceNumber()) { + setLastException(new DLIllegalStateException("Inconsistent state found in entry reader for " + + streamName + " : current segment = " + reader.getSegment() + ", new segment = " + newMetadata)); + return false; + } + if (!reader.getSegment().isInProgress() && newMetadata.isInProgress()) { + setLastException(new DLIllegalStateException("An inprogress log segment " + newMetadata + + " received after a closed log segment " + reader.getSegment() + " on reading segment " + + newMetadata.getLogSegmentSequenceNumber() + " @ stream " + streamName)); + return false; + } + if (reader.getSegment().isInProgress() && !newMetadata.isInProgress()) { + reader.updateLogSegmentMetadata(newMetadata); + } + return true; + } + + /** + * Reinitialize the log segments + */ + private void unsafeReinitializeLogSegments(List<LogSegmentMetadata> segments) { + logger.info("Reinitialize log segments with {}", segments); + int segmentIdx = 0; + for (; segmentIdx < segments.size(); segmentIdx++) { + LogSegmentMetadata segment = segments.get(segmentIdx); + if (segment.getLogSegmentSequenceNumber() < currentSegmentSequenceNumber) { + continue; + } + break; + } + if (segmentIdx >= segments.size()) { + return; + } + LogSegmentMetadata segment = segments.get(segmentIdx); + if (null != currentSegmentReader) { + if (!updateLogSegmentMetadata(currentSegmentReader, segment)) { + return; + } + } else { + if (currentSegmentSequenceNumber != segment.getLogSegmentSequenceNumber()) { + setLastException(new DLIllegalStateException("Inconsistent state found in entry reader for " + + streamName + " : current segment sn = " + currentSegmentSequenceNumber + + ", new segment sn = " + segment.getLogSegmentSequenceNumber())); + return; + } + } + segmentIdx++; + if (segmentIdx >= segments.size()) { + return; + } + // check next segment + segment = segments.get(segmentIdx); + if (null != nextSegmentReader) { + if (!updateLogSegmentMetadata(nextSegmentReader, segment)) { + return; + } + segmentIdx++; + } + // check the segment readers in the queue + for (int readerIdx = 0; + readerIdx < segmentReaders.size() && segmentIdx < segments.size(); + readerIdx++, segmentIdx++) { + SegmentReader reader = segmentReaders.get(readerIdx); + segment = segments.get(segmentIdx); + if (!updateLogSegmentMetadata(reader, segment)) { + return; + } + } + // add the remaining segments to the reader queue + for (; segmentIdx < segments.size(); segmentIdx++) { + segment = segments.get(segmentIdx); + SegmentReader reader = new SegmentReader(segment, 0L); + reader.openReader(); + segmentReaders.add(reader); + } + if (null == currentSegmentReader) { + unsafeMoveToNextLogSegment(); + } + // resume readahead if necessary + invokeReadAhead(); + } + + /** + * Initialize the reader with the log <i>segments</i>. + * + * @param segments list of log segments + */ + private void unsafeInitializeLogSegments(List<LogSegmentMetadata> segments) { + if (segments.isEmpty()) { + // not initialize the background reader, until the first log segment is notified + return; + } + boolean skipTruncatedLogSegments = true; + DLSN dlsnToStart = fromDLSN; + // positioning the reader + for (int i = 0; i < segments.size(); i++) { + LogSegmentMetadata segment = segments.get(i); + // skip any log segments that have smaller log segment sequence numbers + if (segment.getLogSegmentSequenceNumber() < fromDLSN.getLogSegmentSequenceNo()) { + continue; + } + // if the log segment is truncated, skip it. + if (skipTruncatedLogSegments && + !conf.getIgnoreTruncationStatus() && + segment.isTruncated()) { + continue; + } + // if the log segment is partially truncated, move the start dlsn to the min active dlsn + if (skipTruncatedLogSegments && + !conf.getIgnoreTruncationStatus() && + segment.isPartiallyTruncated()) { + if (segment.getMinActiveDLSN().compareTo(fromDLSN) > 0) { + dlsnToStart = segment.getMinActiveDLSN(); + } + } + skipTruncatedLogSegments = false; + if (!isAllowedToPosition(segment, dlsnToStart)) { + return; + } + + SegmentReader reader = new SegmentReader(segment, + segment.getLogSegmentSequenceNumber() == dlsnToStart.getLogSegmentSequenceNo() + ? dlsnToStart.getEntryId() : 0L); + segmentReaders.add(reader); + } + if (segmentReaders.isEmpty()) { + // not initialize the background reader, until the first log segment is available to read + return; + } + currentSegmentReader = segmentReaders.pollFirst(); + currentSegmentReader.openReader(); + currentSegmentReader.startRead(); + currentSegmentSequenceNumber = currentSegmentReader.getSegment().getLogSegmentSequenceNumber(); + unsafeReadNext(currentSegmentReader); + if (!segmentReaders.isEmpty()) { + for (SegmentReader reader : segmentReaders) { + reader.openReader(); + } + unsafePrefetchNextSegment(true); + } + // mark the reader initialized + isInitialized = true; + } + + private void unsafePrefetchNextSegment(boolean onlyInprogressLogSegment) { + SegmentReader nextReader = segmentReaders.peekFirst(); + // open the next log segment if it is inprogress + if (null != nextReader) { + if (onlyInprogressLogSegment && !nextReader.getSegment().isInProgress()) { + return; + } + nextReader.startRead(); + nextSegmentReader = nextReader; + segmentReaders.pollFirst(); + } + } + + /** + * Check if we are allowed to position the reader at <i>fromDLSN</i>. + * + * @return true if it is allowed, otherwise false. + */ + private boolean isAllowedToPosition(LogSegmentMetadata segment, DLSN fromDLSN) { + if (segment.isTruncated() + && segment.getLastDLSN().compareTo(fromDLSN) >= 0 + && !conf.getIgnoreTruncationStatus()) { + setLastException(new AlreadyTruncatedTransactionException(streamName + + " : trying to position read ahead at " + fromDLSN + + " on a segment " + segment + " that is already marked as truncated")); + return false; + } + if (segment.isPartiallyTruncated() && + segment.getMinActiveDLSN().compareTo(fromDLSN) > 0) { + if (conf.getAlertWhenPositioningOnTruncated()) { + alertStatsLogger.raise("Trying to position reader on {} when {} is marked partially truncated", + fromDLSN, segment); + } + if (!conf.getIgnoreTruncationStatus()) { + logger.error("{}: Trying to position reader on {} when {} is marked partially truncated", + new Object[]{ streamName, fromDLSN, segment }); + + setLastException(new AlreadyTruncatedTransactionException(streamName + + " : trying to position read ahead at " + fromDLSN + + " on a segment " + segment + " that is already marked as truncated")); + return false; + } + } + return true; + } + + void moveToNextLogSegment() { + orderedSubmit(new CloseableRunnable() { + @Override + void safeRun() { + unsafeMoveToNextLogSegment(); + } + }); + } + + private void unsafeMoveToNextLogSegment() { + if (null != currentSegmentReader) { + segmentReadersToClose.add(currentSegmentReader); + currentSegmentReader.close().ensure(removeClosedSegmentReadersFunc); + logger.debug("close current segment reader {}", currentSegmentReader.getSegment()); + currentSegmentReader = null; + } + boolean hasSegmentToRead = false; + if (null != nextSegmentReader) { + currentSegmentReader = nextSegmentReader; + logger.debug("move to read segment {}", currentSegmentReader.getSegment()); + currentSegmentSequenceNumber = currentSegmentReader.getSegment().getLogSegmentSequenceNumber(); + nextSegmentReader = null; + // start reading + unsafeReadNext(currentSegmentReader); + unsafePrefetchNextSegment(true); + hasSegmentToRead = true; + } else { + unsafePrefetchNextSegment(false); + if (null != nextSegmentReader) { + currentSegmentReader = nextSegmentReader; + logger.debug("move to read segment {}", currentSegmentReader.getSegment()); + currentSegmentSequenceNumber = currentSegmentReader.getSegment().getLogSegmentSequenceNumber(); + nextSegmentReader = null; + unsafeReadNext(currentSegmentReader); + unsafePrefetchNextSegment(true); + hasSegmentToRead = true; + } + } + if (!hasSegmentToRead) { // no more segment to read, wait until new log segment arrive + if (isCatchingUp) { + logger.info("ReadAhead for {} is caught up and no log segments to read now", + readHandler.getFullyQualifiedName()); + isCatchingUp = false; + } + pauseReadAheadOnNoMoreLogSegments(); + } + } + + void scheduleReadNext() { + orderedSubmit(new CloseableRunnable() { + @Override + void safeRun() { + if (null == currentSegmentReader) { + pauseReadAheadOnNoMoreLogSegments(); + return; + } + unsafeReadNext(currentSegmentReader); + } + }); + } + + private void unsafeReadNext(SegmentReader reader) { + reader.readNext().addEventListener(this); + } + + @Override + public void onSegmentsUpdated(List<LogSegmentMetadata> segments) { + logger.info("segments is updated with {}", segments); + processLogSegments(segments); + } + + @Override + public void onLogStreamDeleted() { + setLastException(new LogNotFoundException("Log stream " + + streamName + " is deleted")); + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java index fd3b63f..be8e1b5 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java @@ -27,6 +27,7 @@ import com.twitter.distributedlog.exceptions.DLIllegalStateException; import com.twitter.distributedlog.exceptions.DLInterruptedException; import com.twitter.distributedlog.exceptions.EndOfLogSegmentException; import com.twitter.distributedlog.exceptions.ReadCancelledException; +import com.twitter.distributedlog.injector.AsyncFailureInjector; import com.twitter.distributedlog.logsegment.LogSegmentEntryReader; import com.twitter.distributedlog.util.FutureUtils; import com.twitter.distributedlog.util.OrderedScheduler; @@ -37,14 +38,16 @@ import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.StatsLogger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.Enumeration; +import java.util.LinkedList; import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -128,6 +131,16 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, LedgerHandle lh, Enumeration<LedgerEntry> entries, Object ctx) { + if (failureInjector.shouldInjectCorruption(entryId, entryId)) { + rc = BKException.Code.DigestMatchException; + } + processReadEntries(rc, lh, entries, ctx); + } + + void processReadEntries(int rc, + LedgerHandle lh, + Enumeration<LedgerEntry> entries, + Object ctx) { if (isDone()) { return; } @@ -155,6 +168,16 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, long entryId, LedgerEntry entry, Object ctx) { + if (failureInjector.shouldInjectCorruption(this.entryId, this.entryId)) { + rc = BKException.Code.DigestMatchException; + } + processReadEntry(rc, entryId, entry, ctx); + } + + void processReadEntry(int rc, + long entryId, + LedgerEntry entry, + Object ctx) { if (isDone()) { return; } @@ -245,10 +268,10 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, private final BookKeeper bk; private final DistributedLogConfiguration conf; private final OrderedScheduler scheduler; - private final long startEntryId; private final long lssn; private final long startSequenceId; private final boolean envelopeEntries; + private final boolean deserializeRecordSet; private final int numPrefetchEntries; private final int maxPrefetchEntries; // state @@ -260,29 +283,39 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, private long nextEntryId; private final AtomicReference<Throwable> lastException = new AtomicReference<Throwable>(null); private final AtomicLong scheduleCount = new AtomicLong(0); + private volatile boolean hasCaughtupOnInprogress = false; // read retries private int readAheadWaitTime; private final int maxReadBackoffTime; private final AtomicInteger numReadErrors = new AtomicInteger(0); + private final boolean skipBrokenEntries; // readahead cache int cachedEntries = 0; int numOutstandingEntries = 0; final LinkedBlockingQueue<CacheEntry> readAheadEntries; // request queue - final ConcurrentLinkedQueue<PendingReadRequest> readQueue; + final LinkedList<PendingReadRequest> readQueue; + + // failure injector + private final AsyncFailureInjector failureInjector; + // Stats + private final Counter skippedBrokenEntriesCounter; BKLogSegmentEntryReader(LogSegmentMetadata metadata, LedgerHandle lh, long startEntryId, BookKeeper bk, OrderedScheduler scheduler, - DistributedLogConfiguration conf) { + DistributedLogConfiguration conf, + StatsLogger statsLogger, + AsyncFailureInjector failureInjector) { this.metadata = metadata; this.lssn = metadata.getLogSegmentSequenceNumber(); this.startSequenceId = metadata.getStartSequenceId(); this.envelopeEntries = metadata.getEnvelopeEntries(); + this.deserializeRecordSet = conf.getDeserializeRecordSetOnReads(); this.lh = lh; - this.startEntryId = this.nextEntryId = Math.max(startEntryId, 0); + this.nextEntryId = Math.max(startEntryId, 0); this.bk = bk; this.conf = conf; this.numPrefetchEntries = conf.getNumPrefetchEntriesPerLogSegment(); @@ -294,17 +327,35 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, // create the readahead queue this.readAheadEntries = new LinkedBlockingQueue<CacheEntry>(); // create the read request queue - this.readQueue = new ConcurrentLinkedQueue<PendingReadRequest>(); + this.readQueue = new LinkedList<PendingReadRequest>(); // read backoff settings this.readAheadWaitTime = conf.getReadAheadWaitTime(); this.maxReadBackoffTime = 4 * conf.getReadAheadWaitTime(); + // other read settings + this.skipBrokenEntries = conf.getReadAheadSkipBrokenEntries(); + + // Failure Injection + this.failureInjector = failureInjector; + // Stats + this.skippedBrokenEntriesCounter = statsLogger.getCounter("skipped_broken_entries"); + } + + @VisibleForTesting + public synchronized CacheEntry getOutstandingLongPoll() { + return outstandingLongPoll; + } + + @VisibleForTesting + LinkedBlockingQueue<CacheEntry> getReadAheadEntries() { + return this.readAheadEntries; } synchronized LedgerHandle getLh() { return lh; } - synchronized LogSegmentMetadata getSegment() { + @Override + public synchronized LogSegmentMetadata getSegment() { return metadata; } @@ -318,6 +369,11 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, prefetchIfNecessary(); } + @Override + public boolean hasCaughtUpOnInprogress() { + return hasCaughtupOnInprogress; + } + // // Process on Log Segment Metadata Updates // @@ -425,10 +481,15 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, } private void cancelAllPendingReads(Throwable throwExc) { - for (PendingReadRequest request : readQueue) { + List<PendingReadRequest> requestsToCancel; + synchronized (readQueue) { + requestsToCancel = Lists.newArrayListWithExpectedSize(readQueue.size()); + requestsToCancel.addAll(readQueue); + readQueue.clear(); + } + for (PendingReadRequest request : requestsToCancel) { request.setException(throwExc); } - readQueue.clear(); } // @@ -475,14 +536,15 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, (!isLedgerClosed() && nextEntryId > getLastAddConfirmed() + 1)) { break; } - entriesToFetch.add(new CacheEntry(nextEntryId)); + CacheEntry entry = new CacheEntry(nextEntryId); + entriesToFetch.add(entry); + readAheadEntries.add(entry); ++numOutstandingEntries; ++cachedEntries; ++nextEntryId; } } for (CacheEntry entry : entriesToFetch) { - readAheadEntries.add(entry); issueRead(entry); } } @@ -518,6 +580,10 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, synchronized (this) { this.outstandingLongPoll = cacheEntry; } + + if (!hasCaughtupOnInprogress) { + hasCaughtupOnInprogress = true; + } getLh().asyncReadLastConfirmedAndEntry( cacheEntry.entryId, conf.getReadLACLongPollTimeout(), @@ -535,7 +601,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, .setLogSegmentInfo(lssn, startSequenceId) .setEntryId(entry.getEntryId()) .setEnvelopeEntry(envelopeEntries) - .deserializeRecordSet(false) + .deserializeRecordSet(deserializeRecordSet) .setInputStream(entry.getEntryInputStream()) .buildReader(); } @@ -635,12 +701,18 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, private void readEntriesFromReadAheadCache(PendingReadRequest nextRequest) { while (!nextRequest.hasReadEnoughEntries()) { - CacheEntry entry = readAheadEntries.peek(); - // no entry available in the read ahead cache + CacheEntry entry; + boolean hitEndOfLogSegment; + synchronized (this) { + entry = readAheadEntries.peek(); + hitEndOfLogSegment = (null == entry) && isEndOfLogSegment(); + } + // reach end of log segment + if (hitEndOfLogSegment) { + setException(new EndOfLogSegmentException(getSegment().getZNodeName()), false); + return; + } if (null == entry) { - if (isEndOfLogSegment()) { - setException(new EndOfLogSegmentException(getSegment().getZNodeName()), false); - } return; } // entry is not complete yet. @@ -665,6 +737,11 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, setException(e, false); return; } + } else if (skipBrokenEntries && BKException.Code.DigestMatchException == entry.getRc()) { + // skip this entry and move forward + skippedBrokenEntriesCounter.inc(); + readAheadEntries.poll(); + continue; } else { setException(new BKTransmitException("Encountered issue on reading entry " + entry.getEntryId() + " @ log segment " + getSegment(), entry.getRc()), false); @@ -685,7 +762,8 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, return isLedgerClosed() && entryId > getLastAddConfirmed(); } - private synchronized boolean isBeyondLastAddConfirmed() { + @Override + public synchronized boolean isBeyondLastAddConfirmed() { return isBeyondLastAddConfirmed(nextEntryId); } @@ -693,10 +771,6 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, return entryId > getLastAddConfirmed(); } - private synchronized boolean isNotBeyondLastAddConfirmed() { - return isNotBeyondLastAddConfirmed(nextEntryId); - } - private boolean isNotBeyondLastAddConfirmed(long entryId) { return entryId <= getLastAddConfirmed(); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java new file mode 100644 index 0000000..dc96a80 --- /dev/null +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.impl.logsegment; + +import com.twitter.distributedlog.DistributedLogConfiguration; +import com.twitter.distributedlog.LogSegmentMetadata; +import com.twitter.distributedlog.exceptions.BKTransmitException; +import com.twitter.distributedlog.injector.AsyncFailureInjector; +import com.twitter.distributedlog.logsegment.LogSegmentEntryReader; +import com.twitter.distributedlog.logsegment.LogSegmentEntryStore; +import com.twitter.distributedlog.logsegment.LogSegmentEntryWriter; +import com.twitter.distributedlog.util.FutureUtils; +import com.twitter.distributedlog.util.OrderedScheduler; +import com.twitter.util.Future; +import com.twitter.util.Promise; +import org.apache.bookkeeper.client.AsyncCallback; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.stats.StatsLogger; + +import static com.google.common.base.Charsets.UTF_8; + +/** + * BookKeeper Based Entry Store + */ +public class BKLogSegmentEntryStore implements LogSegmentEntryStore, AsyncCallback.OpenCallback { + + private static class OpenReaderRequest { + + private final LogSegmentMetadata segment; + private final long startEntryId; + private final Promise<LogSegmentEntryReader> openPromise; + + OpenReaderRequest(LogSegmentMetadata segment, + long startEntryId) { + this.segment = segment; + this.startEntryId = startEntryId; + this.openPromise = new Promise<LogSegmentEntryReader>(); + } + + } + + private final byte[] passwd; + private final BookKeeper bk; + private final OrderedScheduler scheduler; + private final DistributedLogConfiguration conf; + private final StatsLogger statsLogger; + private final AsyncFailureInjector failureInjector; + + public BKLogSegmentEntryStore(DistributedLogConfiguration conf, + BookKeeper bk, + OrderedScheduler scheduler, + StatsLogger statsLogger, + AsyncFailureInjector failureInjector) { + this.conf = conf; + this.bk = bk; + this.passwd = conf.getBKDigestPW().getBytes(UTF_8); + this.scheduler = scheduler; + this.statsLogger = statsLogger; + this.failureInjector = failureInjector; + } + + @Override + public Future<LogSegmentEntryWriter> openWriter(LogSegmentMetadata segment) { + throw new UnsupportedOperationException("Not supported yet"); + } + + @Override + public Future<LogSegmentEntryReader> openReader(LogSegmentMetadata segment, + long startEntryId) { + OpenReaderRequest request = new OpenReaderRequest(segment, startEntryId); + if (segment.isInProgress()) { + bk.asyncOpenLedgerNoRecovery( + segment.getLedgerId(), + BookKeeper.DigestType.CRC32, + passwd, + this, + request); + } else { + bk.asyncOpenLedger( + segment.getLedgerId(), + BookKeeper.DigestType.CRC32, + passwd, + this, + request); + } + return request.openPromise; + } + + @Override + public void openComplete(int rc, LedgerHandle lh, Object ctx) { + OpenReaderRequest request = (OpenReaderRequest) ctx; + if (BKException.Code.OK != rc) { + FutureUtils.setException( + request.openPromise, + new BKTransmitException("Failed to open ledger handle for log segment " + request.segment, rc)); + return; + } + // successfully open a ledger + LogSegmentEntryReader reader = new BKLogSegmentEntryReader( + request.segment, + lh, + request.startEntryId, + bk, + scheduler, + conf, + statsLogger, + failureInjector); + FutureUtils.setValue(request.openPromise, reader); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/AsyncFailureInjector.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/AsyncFailureInjector.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/AsyncFailureInjector.java index ef67266..4145040 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/AsyncFailureInjector.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/AsyncFailureInjector.java @@ -59,7 +59,7 @@ public interface AsyncFailureInjector { } @Override - public boolean shouldInjectCorruption() { + public boolean shouldInjectCorruption(long startEntryId, long endEntryId) { return false; } @@ -122,7 +122,10 @@ public interface AsyncFailureInjector { /** * Return the flag indicating if should inject corruption. * + * @param startEntryId the start entry id + * @param endEntryId the end entry id * @return true to inject corruption otherwise false. */ - boolean shouldInjectCorruption(); + boolean shouldInjectCorruption(long startEntryId, long endEntryId); + } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/AsyncRandomFailureInjector.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/AsyncRandomFailureInjector.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/AsyncRandomFailureInjector.java index 8928494..f3bfea9 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/AsyncRandomFailureInjector.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/AsyncRandomFailureInjector.java @@ -153,8 +153,19 @@ public class AsyncRandomFailureInjector implements AsyncFailureInjector { } @Override - public boolean shouldInjectCorruption() { - return simulateCorruption; + public boolean shouldInjectCorruption(long startEntryId, long endEntryId) { + if (!simulateCorruption) { + return false; + } + if (startEntryId == endEntryId) { + return startEntryId % 10 == 0; + } + for (long i = startEntryId; i <= endEntryId; i++) { + if (i % 10 == 0) { + return true; + } + } + return false; } @Override http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java index d43f3d8..07387cb 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java @@ -39,6 +39,13 @@ public interface LogSegmentEntryReader extends AsyncCloseable { void start(); /** + * Return the log segment metadata for this reader. + * + * @return the log segment metadata + */ + LogSegmentMetadata getSegment(); + + /** * Update the log segment each time when the metadata has changed. * * @param segment new metadata of the log segment. @@ -64,4 +71,18 @@ public interface LogSegmentEntryReader extends AsyncCloseable { */ long getLastAddConfirmed(); + /** + * Is the reader reading beyond last add confirmed. + * + * @return true if the reader is reading beyond last add confirmed + */ + boolean isBeyondLastAddConfirmed(); + + /** + * Has the log segment reader caught up with the inprogress log segment. + * + * @return true only if the log segment is inprogress and it is caught up, otherwise return false. + */ + boolean hasCaughtUpOnInprogress(); + } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java index ff47691..d8611f9 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java @@ -39,8 +39,10 @@ public interface LogSegmentEntryStore { * Open the reader for reading data to the log <i>segment</i>. * * @param segment the log <i>segment</i> to read data from + * @param startEntryId the start entry id * @return future represent the opened reader */ - Future<LogSegmentEntryReader> openReader(LogSegmentMetadata segment); + Future<LogSegmentEntryReader> openReader(LogSegmentMetadata segment, + long startEntryId); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/rate/MovingAverageRateFactory.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/rate/MovingAverageRateFactory.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/rate/MovingAverageRateFactory.java index 2f9869e..a77f753 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/rate/MovingAverageRateFactory.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/rate/MovingAverageRateFactory.java @@ -62,4 +62,4 @@ public class MovingAverageRateFactory { avg.sample(); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java index 6c55014..5161b91 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java @@ -1245,8 +1245,7 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, AsyncClosea public void onSuccess(Enumeration<LedgerEntry> entries) { int rc = BKException.Code.OK; - // If the range includes an entry id that is a multiple of 10, simulate corruption. - if (failureInjector.shouldInjectCorruption() && rangeContainsSimulatedBrokenEntry(startEntryId, endEntryId)) { + if (failureInjector.shouldInjectCorruption(startEntryId, endEntryId)) { rc = BKException.Code.DigestMatchException; } readComplete(rc, null, entries, readCtx, startEntryId, endEntryId); @@ -1259,15 +1258,6 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, AsyncClosea }); } - private boolean rangeContainsSimulatedBrokenEntry(long start, long end) { - for (long i = start; i <= end; i++) { - if (i % 10 == 0) { - return true; - } - } - return false; - } - public void readComplete(final int rc, final LedgerHandle lh, final Enumeration<LedgerEntry> seq, final Object ctx, final long startEntryId, final long endEntryId) { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/util/OrderedScheduler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/OrderedScheduler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/OrderedScheduler.java index 9f34902..287bd6d 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/OrderedScheduler.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/OrderedScheduler.java @@ -475,6 +475,14 @@ public class OrderedScheduler implements ScheduledExecutorService { return chooseExecutor(key).schedule(command, delay, unit); } + public ScheduledFuture<?> scheduleAtFixedRate(Object key, + Runnable command, + long initialDelay, + long period, + TimeUnit unit) { + return chooseExecutor(key).scheduleAtFixedRate(command, initialDelay, period, unit); + } + public Future<?> submit(Object key, Runnable command) { return chooseExecutor(key).submit(command); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderLock.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderLock.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderLock.java index 47cabba..9927616 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderLock.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderLock.java @@ -81,7 +81,7 @@ public class TestAsyncReaderLock extends TestDistributedLogBase { assertEquals(0L, record.getSequenceId()); DLMTestUtil.verifyLogRecord(record); - String readLockPath = reader1.bkLedgerManager.getReadLockPath(); + String readLockPath = reader1.readHandler.getReadLockPath(); Utils.close(reader1); // simulate a old stream created without readlock path http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java index 28f7a74..95d760e 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java @@ -140,6 +140,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { } record = reader.readNext(false); } + reader.close(); assertEquals(3 * 9, numTrans); assertEquals(3 * 9, readDlm.getLogRecordCount()); readDlm.close(); @@ -339,7 +340,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { assertTrue(value.getSequenceId() < 0); assertTrue(value.getSequenceId() > startSequenceId); } - LOG.debug("Recevied record {} from {}", value.getDlsn(), reader.getStreamName()); + LOG.info("Received record {} from {}", value, reader.getStreamName()); assertTrue(!value.isControl()); assertTrue(value.getDlsn().getSlotId() == 0); assertTrue(value.getDlsn().compareTo(startPosition) >= 0); @@ -366,7 +367,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { } @Override public void onFailure(Throwable cause) { - LOG.debug("Encountered Exception on reading {}", reader.getStreamName(), cause); + LOG.error("Encountered Exception on reading {}", reader.getStreamName(), cause); errorsFound.set(true); completionLatch.countDown(); } @@ -806,6 +807,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { readerSyncLatch.await(); assertTrue("Should position reader at least once", reader.getNumReaderPositions().get() > 1); + reader.stop(); dlm.close(); } @@ -906,7 +908,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { confLocal.setOutputBufferSize(1024); DistributedLogManager dlm = createNewDLM(confLocal, name); - int numLogSegments = 20; + int numLogSegments = 5; int numRecordsPerLogSegment = 10; final CountDownLatch doneLatch = new CountDownLatch(1); @@ -924,7 +926,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { reader.start(); - final CountDownLatch writeLatch = new CountDownLatch(200); + final CountDownLatch writeLatch = new CountDownLatch(numLogSegments * numRecordsPerLogSegment); final AtomicBoolean writeErrors = new AtomicBoolean(false); int txid = 1; @@ -949,6 +951,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { syncLatch.await(); assertTrue("Should position reader at least once", reader.getNumReaderPositions().get() > 1); + reader.stop(); dlm.close(); } @@ -1341,7 +1344,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { confLocal.setImmediateFlushEnabled(true); confLocal.setReadAheadBatchSize(1); confLocal.setReadAheadMaxRecords(1); - confLocal.setReaderIdleWarnThresholdMillis(50); + confLocal.setReaderIdleWarnThresholdMillis(0); confLocal.setReaderIdleErrorThresholdMillis(idleReaderErrorThreshold); final DistributedLogManager dlm = createNewDLM(confLocal, name); final Thread currentThread = Thread.currentThread(); @@ -1424,6 +1427,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { Assert.assertEquals(segmentSize, recordCount); } assertFalse(currentThread.isInterrupted()); + Utils.close(reader); executor.shutdown(); } @@ -1512,7 +1516,6 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { try { int txid = 1; for (long i = 0; i < numSegments; i++) { - long start = txid; BKSyncLogWriter writer = (BKSyncLogWriter) dlm.startLogSegmentNonPartitioned(); for (long j = 1; j <= segmentSize; j++) { writer.write(DLMTestUtil.getLargeLogRecordInstance(txid++)); @@ -1558,6 +1561,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { assertTrue(!exceptionEncountered); Assert.assertEquals(recordCount, segmentSize * numSegments); assertTrue(!currentThread.isInterrupted()); + Utils.close(reader); executor.shutdown(); } @@ -1617,19 +1621,20 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { LOG.info("Read record {}", record); assertEquals(1L, record.getTransactionId()); - assertNotNull(reader.bkLedgerManager.readAheadWorker); - assertTrue(reader.bkLedgerManager.readAheadCache.getNumCachedEntries() <= maxAllowedCachedRecords); + assertNotNull(reader.getReadAheadReader()); + assertTrue(reader.getReadAheadReader().getNumCachedEntries() <= maxAllowedCachedRecords); for (int i = 2; i <= numRecords; i++) { record = Await.result(reader.readNext()); LOG.info("Read record {}", record); assertEquals((long) i, record.getTransactionId()); TimeUnit.MILLISECONDS.sleep(20); - int numCachedEntries = reader.bkLedgerManager.readAheadCache.getNumCachedEntries(); + int numCachedEntries = reader.getReadAheadReader().getNumCachedEntries(); assertTrue("Should cache less than " + batchSize + " records but already found " + numCachedEntries + " records when reading " + i + "th record", numCachedEntries <= maxAllowedCachedRecords); } + Utils.close(reader); } @Test(timeout = 60000) @@ -1675,6 +1680,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { fail("Should have thrown"); } catch (EndOfStreamException ex) { } + Utils.close(reader); } @Test(timeout = 60000) @@ -1694,6 +1700,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { fail("Should have thrown"); } catch (EndOfStreamException ex) { } + writer.close(); BKAsyncLogReaderDLSN reader = (BKAsyncLogReaderDLSN) dlm.getAsyncLogReader(DLSN.InitialDLSN); try { @@ -1701,6 +1708,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { fail("Should have thrown"); } catch (EndOfStreamException ex) { } + Utils.close(reader); } @Test(timeout = 60000) @@ -1863,7 +1871,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { } catch (DLIllegalStateException e) { } - reader.asyncClose(); + Utils.close(reader); dlm.close(); } @@ -2096,6 +2104,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { assertEquals(i+1, record.getPositionWithinLogSegment()); assertArrayEquals(DLMTestUtil.generatePayload(i+1), record.getPayload()); } + Utils.close(reader1); + readDLM1.close(); DistributedLogConfiguration readConf2 = new DistributedLogConfiguration(); readConf2.addConfiguration(confLocal); @@ -2124,6 +2134,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { assertEquals(5, LogRecordSet.numRecords(record)); } } + Utils.close(reader2); + readDLM2.close(); } @Test(timeout = 60000) @@ -2152,6 +2164,9 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { } catch (IdleReaderException ire) { // expected } + Utils.close(reader); + writer.close(); + dlm.close(); } @Test(timeout = 60000) @@ -2177,5 +2192,9 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { LogRecordWithDLSN record = FutureUtils.result(reader.readNext()); assertEquals(1L, record.getTransactionId()); DLMTestUtil.verifyLogRecord(record); + + Utils.close(reader); + writer.close(); + dlm.close(); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKSyncLogReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKSyncLogReader.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKSyncLogReader.java index c029dca..54177c8 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKSyncLogReader.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKSyncLogReader.java @@ -162,7 +162,7 @@ public class TestBKSyncLogReader extends TestDistributedLogBase { BKSyncLogReaderDLSN reader = (BKSyncLogReaderDLSN) dlm.getInputStream(1L); // wait until readahead caught up - while (!reader.getReadHandler().isReadAheadCaughtUp()) { + while (!reader.getReadAheadReader().isReadAheadCaughtUp()) { TimeUnit.MILLISECONDS.sleep(20); } @@ -178,8 +178,7 @@ public class TestBKSyncLogReader extends TestDistributedLogBase { logger.info("Write another 10 records"); // wait until readahead move on - while (reader.getReadHandler() - .readAheadWorker.getNextReadAheadPosition().getEntryId() < 21) { + while (reader.getReadAheadReader().getNextEntryPosition().getEntryId() < 21) { TimeUnit.MILLISECONDS.sleep(20); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/test/java/com/twitter/distributedlog/TestEntryPosition.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestEntryPosition.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestEntryPosition.java new file mode 100644 index 0000000..384d1e8 --- /dev/null +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestEntryPosition.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog; + +import org.junit.Test; + +import static org.junit.Assert.*; + +/** + * Test Case for {@link EntryPosition} + */ +public class TestEntryPosition { + + private void checkPosition(EntryPosition position, + long lssn, + long entryId) { + assertEquals(position.getLogSegmentSequenceNumber(), lssn); + assertEquals(position.getEntryId(), entryId); + } + + @Test + public void testAdvance() { + EntryPosition position = new EntryPosition(9L, 99L); + + checkPosition(position, 9L, 99L); + + // advance (8L, 100L) takes no effect + assertFalse(position.advance(8L, 100L)); + checkPosition(position, 9L, 99L); + // advance (9L, 98L) takes no effect + assertFalse(position.advance(9L, 98L)); + checkPosition(position, 9L, 99L); + // advance (9L, 99L) takes no effect + assertFalse(position.advance(9L, 99L)); + checkPosition(position, 9L, 99L); + // advance (9L, 100L) takes effects + assertTrue(position.advance(9L, 100L)); + checkPosition(position, 9L, 100L); + // advance (10L, 0L) takes effects + assertTrue(position.advance(10L, 0L)); + checkPosition(position, 10L, 0L); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReadsMultiReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReadsMultiReader.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReadsMultiReader.java index 775c99d..3f47337 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReadsMultiReader.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReadsMultiReader.java @@ -148,9 +148,8 @@ public class TestNonBlockingReadsMultiReader extends TestDistributedLogBase { LOG.info("Writer stopped after writing {} records, waiting for reader to complete", writeCount.get()); while (writeCount.get() > (readerThreads[0].getReadCount())) { - LOG.info("Write Count = {}, Read Count = {}, ReadAhead = {}", - new Object[] { writeCount.get(), readerThreads[0].getReadCount(), - reader0.getReadHandler().getReadAheadCache() }); + LOG.info("Write Count = {}, Read Count = {}", + new Object[] { writeCount.get(), readerThreads[0].getReadCount() }); TimeUnit.MILLISECONDS.sleep(100); } assertEquals(writeCount.get(), http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAhead.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAhead.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAhead.java deleted file mode 100644 index 71b6834..0000000 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAhead.java +++ /dev/null @@ -1,158 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import com.twitter.distributedlog.LogSegmentMetadata.LogSegmentMetadataVersion; -import com.twitter.distributedlog.util.Utils; -import com.twitter.distributedlog.readahead.ReadAheadWorker; -import com.twitter.util.Await; -import com.twitter.util.Duration; -import com.twitter.util.Future; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static org.junit.Assert.*; - -/** - * {@link ReadAheadWorker} related test cases. - */ -public class TestReadAhead extends TestDistributedLogBase { - - static final Logger logger = LoggerFactory.getLogger(TestReadAhead.class); - - @Test(timeout = 60000) - public void testNoSuchLedgerExceptionOnReadLAC() throws Exception { - String name = "distrlog-nosuchledger-exception-on-readlac"; - DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); - confLocal.loadConf(conf); - confLocal.setReadAheadWaitTime(500); - confLocal.setReadAheadNoSuchLedgerExceptionOnReadLACErrorThresholdMillis(2000); - confLocal.setDLLedgerMetadataLayoutVersion(LogSegmentMetadataVersion.VERSION_V4_ENVELOPED_ENTRIES.value); - confLocal.setLogSegmentSequenceNumberValidationEnabled(false); - - DistributedLogManager dlm = createNewDLM(confLocal, name); - DLMTestUtil.injectLogSegmentWithGivenLogSegmentSeqNo(dlm, confLocal, 1L, 1L, false, 0, false); - DLMTestUtil.injectLogSegmentWithGivenLogSegmentSeqNo(dlm, confLocal, 2L, 11L, true, 10, true); - - BKDistributedLogManager readDLM = (BKDistributedLogManager) createNewDLM(confLocal, name); - final BKAsyncLogReaderDLSN reader = (BKAsyncLogReaderDLSN) readDLM.getAsyncLogReader(DLSN.InitialDLSN); - final Future<LogRecordWithDLSN> readFuture = reader.readNext(); - try { - Await.result(readFuture, Duration.fromMilliseconds(2000)); - fail("Should not read any data beyond an empty inprogress log segment"); - } catch (TimeoutException e) { - // expected - } - - LedgerDescriptor ld1; - while (null == (ld1 = reader.bkLedgerManager.readAheadWorker.getCurrentLedgerDescriptor())) { - Thread.sleep(100); - } - - TimeUnit.MILLISECONDS.sleep(2 * 2000); - - LedgerDescriptor ld2; - while (null == (ld2 = reader.bkLedgerManager.readAheadWorker.getCurrentLedgerDescriptor())) { - Thread.sleep(100); - } - - // ledger handle would be re-initialized after reaching error threshold - assertTrue("ledger handle should be reinitialized, after reaching error threshold.", ld1 != ld2); - - dlm.close(); - - dlm = createNewDLM(confLocal, name); - dlm.recover(); - - long expectedTxId = 11L; - LogRecord record = Await.result(readFuture); - assertNotNull(record); - DLMTestUtil.verifyLogRecord(record); - assertEquals(expectedTxId, record.getTransactionId()); - expectedTxId++; - - for (int i = 1; i < 10; i++) { - record = Await.result(reader.readNext()); - assertNotNull(record); - DLMTestUtil.verifyLogRecord(record); - assertEquals(expectedTxId, record.getTransactionId()); - expectedTxId++; - } - - Utils.close(reader); - readDLM.close(); - - } - - @Test(timeout = 60000) - public void testReadAheadWaitOnEndOfStream() throws Exception { - String name = "distrlog-readahead-wait-on-end-of-stream"; - DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); - confLocal.loadConf(conf); - confLocal.setZKNumRetries(0); - confLocal.setReadAheadWaitTime(500); - confLocal.setReadAheadWaitTimeOnEndOfStream(Integer.MAX_VALUE); - - DistributedLogManager dlm = createNewDLM(confLocal, name); - DLMTestUtil.generateCompletedLogSegments(dlm, confLocal, 3, 10); - - BKDistributedLogManager readDLM = (BKDistributedLogManager) createNewDLM(confLocal, name); - final BKAsyncLogReaderDLSN reader = (BKAsyncLogReaderDLSN) readDLM.getAsyncLogReader(DLSN.InitialDLSN); - - int numReads = 0; - long expectedID = 1; - for (long i = 0; i < 3; i++) { - for (long j = 1; j <= 10; j++) { - LogRecordWithDLSN record = Await.result(reader.readNext()); - assertEquals(expectedID++, record.getTransactionId()); - DLMTestUtil.verifyLogRecord(record); - ++numReads; - } - } - assertEquals(30, numReads); - // we are at the end of the stream and there isn't inprogress log segment - Future<LogRecordWithDLSN> readFuture = reader.readNext(); - - // make sure readahead is backing off on reading log segment on Integer.MAX_VALUE - AsyncNotification notification1; - while (null == (notification1 = reader.bkLedgerManager.readAheadWorker.getMetadataNotification())) { - Thread.sleep(200); - } - Thread.sleep(1000); - - // write another record - BKSyncLogWriter writer = - (BKSyncLogWriter) dlm.startLogSegmentNonPartitioned(); - writer.write(DLMTestUtil.getLogRecordInstance(31L)); - writer.closeAndComplete(); - - LogRecordWithDLSN record = Await.result(readFuture); - assertEquals(31L, record.getTransactionId()); - DLMTestUtil.verifyLogRecord(record); - - Utils.close(reader); - readDLM.close(); - - dlm.close(); - } - -}