http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java new file mode 100644 index 0000000..0b8c55a --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java @@ -0,0 +1,992 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Stopwatch; +import com.google.common.base.Ticker; +import com.google.common.collect.Lists; +import org.apache.distributedlog.callback.LogSegmentListener; +import org.apache.distributedlog.exceptions.AlreadyTruncatedTransactionException; +import org.apache.distributedlog.exceptions.DLIllegalStateException; +import org.apache.distributedlog.exceptions.DLInterruptedException; +import org.apache.distributedlog.exceptions.EndOfLogSegmentException; +import org.apache.distributedlog.exceptions.LogNotFoundException; +import org.apache.distributedlog.exceptions.UnexpectedException; +import org.apache.distributedlog.io.AsyncCloseable; +import org.apache.distributedlog.logsegment.LogSegmentEntryReader; +import org.apache.distributedlog.logsegment.LogSegmentEntryStore; +import org.apache.distributedlog.logsegment.LogSegmentFilter; +import org.apache.distributedlog.util.OrderedScheduler; +import com.twitter.util.Function0; +import com.twitter.util.Future; +import com.twitter.util.FutureEventListener; +import com.twitter.util.Futures; +import com.twitter.util.Promise; +import org.apache.bookkeeper.stats.AlertStatsLogger; +import org.apache.bookkeeper.versioning.Versioned; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Function1; +import scala.runtime.AbstractFunction1; +import scala.runtime.BoxedUnit; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +/** + * New ReadAhead Reader that uses {@link org.apache.distributedlog.logsegment.LogSegmentEntryReader}. + * + * NOTE: all the state changes happen in the same thread. All *unsafe* methods should be submitted to the order + * scheduler using stream name as the key. + */ +public class ReadAheadEntryReader implements + AsyncCloseable, + LogSegmentListener, + LogSegmentEntryReader.StateChangeListener, + FutureEventListener<List<Entry.Reader>> { + + private static final Logger logger = LoggerFactory.getLogger(ReadAheadEntryReader.class); + + // + // Static Functions + // + + private static AbstractFunction1<LogSegmentEntryReader, BoxedUnit> START_READER_FUNC = new AbstractFunction1<LogSegmentEntryReader, BoxedUnit>() { + @Override + public BoxedUnit apply(LogSegmentEntryReader reader) { + reader.start(); + return BoxedUnit.UNIT; + } + }; + + // + // Internal Classes + // + + class SegmentReader implements FutureEventListener<LogSegmentEntryReader> { + + private LogSegmentMetadata metadata; + private final long startEntryId; + private Future<LogSegmentEntryReader> openFuture = null; + private LogSegmentEntryReader reader = null; + private boolean isStarted = false; + private boolean isClosed = false; + + SegmentReader(LogSegmentMetadata metadata, + long startEntryId) { + this.metadata = metadata; + this.startEntryId = startEntryId; + } + + synchronized LogSegmentEntryReader getEntryReader() { + return reader; + } + + synchronized boolean isBeyondLastAddConfirmed() { + return null != reader && reader.isBeyondLastAddConfirmed(); + } + + synchronized LogSegmentMetadata getSegment() { + return metadata; + } + + synchronized boolean isReaderOpen() { + return null != openFuture; + } + + synchronized void openReader() { + if (null != openFuture) { + return; + } + openFuture = entryStore.openReader(metadata, startEntryId).addEventListener(this); + } + + synchronized boolean isReaderStarted() { + return isStarted; + } + + synchronized void startRead() { + if (isStarted) { + return; + } + isStarted = true; + if (null != reader) { + reader.start(); + } else { + openFuture.onSuccess(START_READER_FUNC); + } + } + + synchronized Future<List<Entry.Reader>> readNext() { + if (null != reader) { + checkCatchingUpStatus(reader); + return reader.readNext(numReadAheadEntries); + } else { + return openFuture.flatMap(readFunc); + } + } + + synchronized void updateLogSegmentMetadata(final LogSegmentMetadata segment) { + if (null != reader) { + reader.onLogSegmentMetadataUpdated(segment); + this.metadata = segment; + } else { + openFuture.onSuccess(new AbstractFunction1<LogSegmentEntryReader, BoxedUnit>() { + @Override + public BoxedUnit apply(LogSegmentEntryReader reader) { + reader.onLogSegmentMetadataUpdated(segment); + synchronized (SegmentReader.this) { + SegmentReader.this.metadata = segment; + } + return BoxedUnit.UNIT; + } + }); + } + } + + @Override + synchronized public void onSuccess(LogSegmentEntryReader reader) { + this.reader = reader; + if (reader.getSegment().isInProgress()) { + reader.registerListener(ReadAheadEntryReader.this); + } + } + + @Override + public void onFailure(Throwable cause) { + // no-op, the failure will be propagated on first read. + } + + synchronized boolean isClosed() { + return isClosed; + } + + synchronized Future<Void> close() { + if (null == openFuture) { + return Future.Void(); + } + return openFuture.flatMap(new AbstractFunction1<LogSegmentEntryReader, Future<Void>>() { + @Override + public Future<Void> apply(LogSegmentEntryReader reader) { + return reader.asyncClose(); + } + }).ensure(new Function0<BoxedUnit>() { + @Override + public BoxedUnit apply() { + synchronized (SegmentReader.this) { + isClosed = true; + } + return null; + } + }); + } + } + + private class ReadEntriesFunc extends AbstractFunction1<LogSegmentEntryReader, Future<List<Entry.Reader>>> { + + private final int numEntries; + + ReadEntriesFunc(int numEntries) { + this.numEntries = numEntries; + } + + @Override + public Future<List<Entry.Reader>> apply(LogSegmentEntryReader reader) { + checkCatchingUpStatus(reader); + return reader.readNext(numEntries); + } + } + + private abstract class CloseableRunnable implements Runnable { + + @Override + public void run() { + synchronized (ReadAheadEntryReader.this) { + if (null != closePromise) { + return; + } + } + try { + safeRun(); + } catch (Throwable cause) { + logger.error("Caught unexpected exception : ", cause); + } + } + + abstract void safeRun(); + + } + + // + // Functions + // + private final Function1<LogSegmentEntryReader, Future<List<Entry.Reader>>> readFunc; + private final Function0<BoxedUnit> removeClosedSegmentReadersFunc = new Function0<BoxedUnit>() { + @Override + public BoxedUnit apply() { + removeClosedSegmentReaders(); + return BoxedUnit.UNIT; + } + }; + + // + // Resources + // + private final DistributedLogConfiguration conf; + private final BKLogReadHandler readHandler; + private final LogSegmentEntryStore entryStore; + private final OrderedScheduler scheduler; + + // + // Parameters + // + private final String streamName; + private final DLSN fromDLSN; + private final int maxCachedEntries; + private final int numReadAheadEntries; + private final int idleWarnThresholdMillis; + + // + // Cache + // + private final LinkedBlockingQueue<Entry.Reader> entryQueue; + + // + // State of the reader + // + + private final AtomicBoolean started = new AtomicBoolean(false); + private boolean isInitialized = false; + private boolean readAheadPaused = false; + private Promise<Void> closePromise = null; + // segment readers + private long currentSegmentSequenceNumber; + private SegmentReader currentSegmentReader; + private SegmentReader nextSegmentReader; + private DLSN lastDLSN; + private final EntryPosition nextEntryPosition; + private volatile boolean isCatchingUp = true; + private final LinkedList<SegmentReader> segmentReaders; + private final LinkedList<SegmentReader> segmentReadersToClose; + // last exception that this reader encounters + private final AtomicReference<IOException> lastException = new AtomicReference<IOException>(null); + // last entry added time + private final Stopwatch lastEntryAddedTime; + // state change notification + private final CopyOnWriteArraySet<AsyncNotification> stateChangeNotifications = + new CopyOnWriteArraySet<AsyncNotification>(); + // idle reader check task + private final ScheduledFuture<?> idleReaderCheckTask; + + // + // Stats + // + private final AlertStatsLogger alertStatsLogger; + + public ReadAheadEntryReader(String streamName, + DLSN fromDLSN, + DistributedLogConfiguration conf, + BKLogReadHandler readHandler, + LogSegmentEntryStore entryStore, + OrderedScheduler scheduler, + Ticker ticker, + AlertStatsLogger alertStatsLogger) { + this.streamName = streamName; + this.fromDLSN = lastDLSN = fromDLSN; + this.nextEntryPosition = new EntryPosition( + fromDLSN.getLogSegmentSequenceNo(), + fromDLSN.getEntryId()); + this.conf = conf; + this.maxCachedEntries = conf.getReadAheadMaxRecords(); + this.numReadAheadEntries = conf.getReadAheadBatchSize(); + this.idleWarnThresholdMillis = conf.getReaderIdleWarnThresholdMillis(); + this.readHandler = readHandler; + this.entryStore = entryStore; + this.scheduler = scheduler; + this.readFunc = new ReadEntriesFunc(numReadAheadEntries); + this.alertStatsLogger = alertStatsLogger; + + // create the segment reader list + this.segmentReaders = new LinkedList<SegmentReader>(); + this.segmentReadersToClose = new LinkedList<SegmentReader>(); + // create the readahead entry queue + this.entryQueue = new LinkedBlockingQueue<Entry.Reader>(); + + // start the idle reader detection + lastEntryAddedTime = Stopwatch.createStarted(ticker); + // start the idle reader check task + idleReaderCheckTask = scheduleIdleReaderTaskIfNecessary(); + } + + private ScheduledFuture<?> scheduleIdleReaderTaskIfNecessary() { + if (idleWarnThresholdMillis < Integer.MAX_VALUE && idleWarnThresholdMillis > 0) { + return scheduler.scheduleAtFixedRate(streamName, new Runnable() { + @Override + public void run() { + if (!isReaderIdle(idleWarnThresholdMillis, TimeUnit.MILLISECONDS)) { + return; + } + // the readahead has been idle + unsafeCheckIfReadAheadIsIdle(); + } + }, idleWarnThresholdMillis, idleWarnThresholdMillis, TimeUnit.MILLISECONDS); + } + return null; + } + + private void unsafeCheckIfReadAheadIsIdle() { + boolean forceReadLogSegments = + (null == currentSegmentReader) || currentSegmentReader.isBeyondLastAddConfirmed(); + if (forceReadLogSegments) { + readHandler.readLogSegmentsFromStore( + LogSegmentMetadata.COMPARATOR, + LogSegmentFilter.DEFAULT_FILTER, + null + ).addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { + @Override + public void onFailure(Throwable cause) { + // do nothing here since it would be retried on next idle reader check task + } + + @Override + public void onSuccess(Versioned<List<LogSegmentMetadata>> segments) { + onSegmentsUpdated(segments.getValue()); + } + }); + } + } + + private void cancelIdleReaderTask() { + if (null != idleReaderCheckTask) { + idleReaderCheckTask.cancel(true); + } + } + + @VisibleForTesting + EntryPosition getNextEntryPosition() { + return nextEntryPosition; + } + + @VisibleForTesting + SegmentReader getCurrentSegmentReader() { + return currentSegmentReader; + } + + @VisibleForTesting + long getCurrentSegmentSequenceNumber() { + return currentSegmentSequenceNumber; + } + + @VisibleForTesting + SegmentReader getNextSegmentReader() { + return nextSegmentReader; + } + + @VisibleForTesting + LinkedList<SegmentReader> getSegmentReaders() { + return segmentReaders; + } + + @VisibleForTesting + boolean isInitialized() { + return isInitialized; + } + + private void orderedSubmit(Runnable runnable) { + synchronized (this) { + if (null != closePromise) { + return; + } + } + try { + scheduler.submit(streamName, runnable); + } catch (RejectedExecutionException ree) { + logger.debug("Failed to submit and execute an operation for readhead entry reader of {}", + streamName, ree); + } + } + + public void start(final List<LogSegmentMetadata> segmentList) { + logger.info("Starting the readahead entry reader for {} : segments = {}", + readHandler.getFullyQualifiedName(), segmentList); + started.set(true); + processLogSegments(segmentList); + } + + private void removeClosedSegmentReaders() { + orderedSubmit(new CloseableRunnable() { + @Override + void safeRun() { + unsafeRemoveClosedSegmentReaders(); + } + }); + } + + private void unsafeRemoveClosedSegmentReaders() { + SegmentReader reader = segmentReadersToClose.peekFirst(); + while (null != reader) { + if (reader.isClosed()) { + segmentReadersToClose.pollFirst(); + reader = segmentReadersToClose.peekFirst(); + } else { + break; + } + } + } + + @Override + public Future<Void> asyncClose() { + final Promise<Void> closeFuture; + synchronized (this) { + if (null != closePromise) { + return closePromise; + } + closePromise = closeFuture = new Promise<Void>(); + } + + // cancel the idle reader task + cancelIdleReaderTask(); + + // use runnable here instead of CloseableRunnable, + // because we need this to be executed + try { + scheduler.submit(streamName, new Runnable() { + @Override + public void run() { + unsafeAsyncClose(closeFuture); + } + }); + } catch (RejectedExecutionException ree) { + logger.warn("Scheduler has been shutdown before closing the readahead entry reader for stream {}", + streamName, ree); + unsafeAsyncClose(closeFuture); + } + + return closeFuture; + } + + private void unsafeAsyncClose(Promise<Void> closePromise) { + List<Future<Void>> closeFutures = Lists.newArrayListWithExpectedSize( + segmentReaders.size() + segmentReadersToClose.size() + 1); + if (null != currentSegmentReader) { + segmentReadersToClose.add(currentSegmentReader); + } + if (null != nextSegmentReader) { + segmentReadersToClose.add(nextSegmentReader); + } + for (SegmentReader reader : segmentReaders) { + segmentReadersToClose.add(reader); + } + segmentReaders.clear(); + for (SegmentReader reader : segmentReadersToClose) { + closeFutures.add(reader.close()); + } + Futures.collect(closeFutures).proxyTo(closePromise); + } + + // + // Reader State Changes + // + + ReadAheadEntryReader addStateChangeNotification(AsyncNotification notification) { + this.stateChangeNotifications.add(notification); + return this; + } + + ReadAheadEntryReader removeStateChangeNotification(AsyncNotification notification) { + this.stateChangeNotifications.remove(notification); + return this; + } + + private void notifyStateChangeOnSuccess() { + for (AsyncNotification notification : stateChangeNotifications) { + notification.notifyOnOperationComplete(); + } + } + + private void notifyStateChangeOnFailure(Throwable cause) { + for (AsyncNotification notification : stateChangeNotifications) { + notification.notifyOnError(cause); + } + } + + void setLastException(IOException cause) { + if (!lastException.compareAndSet(null, cause)) { + logger.debug("last exception has already been set to ", lastException.get()); + } + // the exception is set and notify the state change + notifyStateChangeOnFailure(cause); + } + + void checkLastException() throws IOException { + if (null != lastException.get()) { + throw lastException.get(); + } + } + + void checkCatchingUpStatus(LogSegmentEntryReader reader) { + if (reader.getSegment().isInProgress() + && isCatchingUp + && reader.hasCaughtUpOnInprogress()) { + logger.info("ReadAhead for {} is caught up at entry {} @ log segment {}.", + new Object[] { readHandler.getFullyQualifiedName(), + reader.getLastAddConfirmed(), reader.getSegment() }); + isCatchingUp = false; + } + } + + void markCaughtup() { + if (isCatchingUp) { + isCatchingUp = false; + logger.info("ReadAhead for {} is caught up", readHandler.getFullyQualifiedName()); + } + } + + public boolean isReadAheadCaughtUp() { + return !isCatchingUp; + } + + @Override + public void onCaughtupOnInprogress() { + markCaughtup(); + } + + // + // ReadAhead State Machine + // + + @Override + public void onSuccess(List<Entry.Reader> entries) { + lastEntryAddedTime.reset().start(); + for (Entry.Reader entry : entries) { + entryQueue.add(entry); + } + if (!entries.isEmpty()) { + Entry.Reader lastEntry = entries.get(entries.size() - 1); + nextEntryPosition.advance(lastEntry.getLSSN(), lastEntry.getEntryId() + 1); + } + // notify on data available + notifyStateChangeOnSuccess(); + if (entryQueue.size() >= maxCachedEntries) { + pauseReadAheadOnCacheFull(); + } else { + scheduleReadNext(); + } + } + + @Override + public void onFailure(Throwable cause) { + if (cause instanceof EndOfLogSegmentException) { + // we reach end of the log segment + moveToNextLogSegment(); + return; + } + if (cause instanceof IOException) { + setLastException((IOException) cause); + } else { + setLastException(new UnexpectedException("Unexpected non I/O exception", cause)); + } + } + + private synchronized void invokeReadAhead() { + if (readAheadPaused) { + scheduleReadNext(); + readAheadPaused = false; + } + } + + private synchronized void pauseReadAheadOnCacheFull() { + this.readAheadPaused = true; + if (!isCacheFull()) { + invokeReadAhead(); + } + } + + private synchronized void pauseReadAheadOnNoMoreLogSegments() { + this.readAheadPaused = true; + } + + // + // Cache Related Methods + // + + public Entry.Reader getNextReadAheadEntry(long waitTime, TimeUnit waitTimeUnit) throws IOException { + if (null != lastException.get()) { + throw lastException.get(); + } + Entry.Reader entry; + try { + entry = entryQueue.poll(waitTime, waitTimeUnit); + } catch (InterruptedException e) { + throw new DLInterruptedException("Interrupted on waiting next readahead entry : ", e); + } + try { + return entry; + } finally { + // resume readahead if the cache becomes empty + if (null != entry && !isCacheFull()) { + invokeReadAhead(); + } + } + } + + /** + * Return number cached entries. + * + * @return number cached entries. + */ + public int getNumCachedEntries() { + return entryQueue.size(); + } + + /** + * Return if the cache is full. + * + * @return true if the cache is full, otherwise false. + */ + public boolean isCacheFull() { + return getNumCachedEntries() >= maxCachedEntries; + } + + @VisibleForTesting + public boolean isCacheEmpty() { + return entryQueue.isEmpty(); + } + + /** + * Check whether the readahead becomes stall. + * + * @param idleReaderErrorThreshold idle reader error threshold + * @param timeUnit time unit of the idle reader error threshold + * @return true if the readahead becomes stall, otherwise false. + */ + public boolean isReaderIdle(int idleReaderErrorThreshold, TimeUnit timeUnit) { + return (lastEntryAddedTime.elapsed(timeUnit) > idleReaderErrorThreshold); + } + + // + // LogSegment Management + // + + void processLogSegments(final List<LogSegmentMetadata> segments) { + orderedSubmit(new CloseableRunnable() { + @Override + void safeRun() { + unsafeProcessLogSegments(segments); + } + }); + } + + private void unsafeProcessLogSegments(List<LogSegmentMetadata> segments) { + if (isInitialized) { + unsafeReinitializeLogSegments(segments); + } else { + unsafeInitializeLogSegments(segments); + } + } + + /** + * Update the log segment metadata. + * + * @param reader the reader to update the metadata + * @param newMetadata the new metadata received + * @return true if successfully, false on encountering errors + */ + private boolean updateLogSegmentMetadata(SegmentReader reader, + LogSegmentMetadata newMetadata) { + if (reader.getSegment().getLogSegmentSequenceNumber() != newMetadata.getLogSegmentSequenceNumber()) { + setLastException(new DLIllegalStateException("Inconsistent state found in entry reader for " + + streamName + " : current segment = " + reader.getSegment() + ", new segment = " + newMetadata)); + return false; + } + if (!reader.getSegment().isInProgress() && newMetadata.isInProgress()) { + setLastException(new DLIllegalStateException("An inprogress log segment " + newMetadata + + " received after a closed log segment " + reader.getSegment() + " on reading segment " + + newMetadata.getLogSegmentSequenceNumber() + " @ stream " + streamName)); + return false; + } + if (reader.getSegment().isInProgress() && !newMetadata.isInProgress()) { + reader.updateLogSegmentMetadata(newMetadata); + } + return true; + } + + /** + * Reinitialize the log segments + */ + private void unsafeReinitializeLogSegments(List<LogSegmentMetadata> segments) { + logger.info("Reinitialize log segments with {}", segments); + int segmentIdx = 0; + for (; segmentIdx < segments.size(); segmentIdx++) { + LogSegmentMetadata segment = segments.get(segmentIdx); + if (segment.getLogSegmentSequenceNumber() < currentSegmentSequenceNumber) { + continue; + } + break; + } + if (segmentIdx >= segments.size()) { + return; + } + LogSegmentMetadata segment = segments.get(segmentIdx); + if (null != currentSegmentReader) { + if (!updateLogSegmentMetadata(currentSegmentReader, segment)) { + return; + } + } else { + if (currentSegmentSequenceNumber != segment.getLogSegmentSequenceNumber()) { + setLastException(new DLIllegalStateException("Inconsistent state found in entry reader for " + + streamName + " : current segment sn = " + currentSegmentSequenceNumber + + ", new segment sn = " + segment.getLogSegmentSequenceNumber())); + return; + } + } + segmentIdx++; + if (segmentIdx >= segments.size()) { + return; + } + // check next segment + segment = segments.get(segmentIdx); + if (null != nextSegmentReader) { + if (!updateLogSegmentMetadata(nextSegmentReader, segment)) { + return; + } + segmentIdx++; + } + // check the segment readers in the queue + for (int readerIdx = 0; + readerIdx < segmentReaders.size() && segmentIdx < segments.size(); + readerIdx++, segmentIdx++) { + SegmentReader reader = segmentReaders.get(readerIdx); + segment = segments.get(segmentIdx); + if (!updateLogSegmentMetadata(reader, segment)) { + return; + } + } + // add the remaining segments to the reader queue + for (; segmentIdx < segments.size(); segmentIdx++) { + segment = segments.get(segmentIdx); + SegmentReader reader = new SegmentReader(segment, 0L); + reader.openReader(); + segmentReaders.add(reader); + } + if (null == currentSegmentReader) { + unsafeMoveToNextLogSegment(); + } + // resume readahead if necessary + invokeReadAhead(); + } + + /** + * Initialize the reader with the log <i>segments</i>. + * + * @param segments list of log segments + */ + private void unsafeInitializeLogSegments(List<LogSegmentMetadata> segments) { + if (segments.isEmpty()) { + // not initialize the background reader, until the first log segment is notified + return; + } + boolean skipTruncatedLogSegments = true; + DLSN dlsnToStart = fromDLSN; + // positioning the reader + for (int i = 0; i < segments.size(); i++) { + LogSegmentMetadata segment = segments.get(i); + // skip any log segments that have smaller log segment sequence numbers + if (segment.getLogSegmentSequenceNumber() < fromDLSN.getLogSegmentSequenceNo()) { + continue; + } + // if the log segment is truncated, skip it. + if (skipTruncatedLogSegments && + !conf.getIgnoreTruncationStatus() && + segment.isTruncated()) { + continue; + } + // if the log segment is partially truncated, move the start dlsn to the min active dlsn + if (skipTruncatedLogSegments && + !conf.getIgnoreTruncationStatus() && + segment.isPartiallyTruncated()) { + if (segment.getMinActiveDLSN().compareTo(fromDLSN) > 0) { + dlsnToStart = segment.getMinActiveDLSN(); + } + } + skipTruncatedLogSegments = false; + if (!isAllowedToPosition(segment, dlsnToStart)) { + logger.error("segment {} is not allowed to position at {}", segment, dlsnToStart); + return; + } + + SegmentReader reader = new SegmentReader(segment, + segment.getLogSegmentSequenceNumber() == dlsnToStart.getLogSegmentSequenceNo() + ? dlsnToStart.getEntryId() : 0L); + segmentReaders.add(reader); + } + if (segmentReaders.isEmpty()) { + // not initialize the background reader, until the first log segment is available to read + return; + } + currentSegmentReader = segmentReaders.pollFirst(); + currentSegmentReader.openReader(); + currentSegmentReader.startRead(); + currentSegmentSequenceNumber = currentSegmentReader.getSegment().getLogSegmentSequenceNumber(); + unsafeReadNext(currentSegmentReader); + if (!segmentReaders.isEmpty()) { + for (SegmentReader reader : segmentReaders) { + reader.openReader(); + } + unsafePrefetchNextSegment(true); + } + // mark the reader initialized + isInitialized = true; + } + + private void unsafePrefetchNextSegment(boolean onlyInprogressLogSegment) { + SegmentReader nextReader = segmentReaders.peekFirst(); + // open the next log segment if it is inprogress + if (null != nextReader) { + if (onlyInprogressLogSegment && !nextReader.getSegment().isInProgress()) { + return; + } + nextReader.startRead(); + nextSegmentReader = nextReader; + segmentReaders.pollFirst(); + } + } + + /** + * Check if we are allowed to position the reader at <i>fromDLSN</i>. + * + * @return true if it is allowed, otherwise false. + */ + private boolean isAllowedToPosition(LogSegmentMetadata segment, DLSN fromDLSN) { + if (segment.isTruncated() + && segment.getLastDLSN().compareTo(fromDLSN) >= 0 + && !conf.getIgnoreTruncationStatus()) { + setLastException(new AlreadyTruncatedTransactionException(streamName + + " : trying to position read ahead at " + fromDLSN + + " on a segment " + segment + " that is already marked as truncated")); + return false; + } + if (segment.isPartiallyTruncated() && + segment.getMinActiveDLSN().compareTo(fromDLSN) > 0) { + if (conf.getAlertWhenPositioningOnTruncated()) { + alertStatsLogger.raise("Trying to position reader on {} when {} is marked partially truncated", + fromDLSN, segment); + } + if (!conf.getIgnoreTruncationStatus()) { + logger.error("{}: Trying to position reader on {} when {} is marked partially truncated", + new Object[]{ streamName, fromDLSN, segment }); + + setLastException(new AlreadyTruncatedTransactionException(streamName + + " : trying to position read ahead at " + fromDLSN + + " on a segment " + segment + " that is already marked as truncated")); + return false; + } + } + return true; + } + + void moveToNextLogSegment() { + orderedSubmit(new CloseableRunnable() { + @Override + void safeRun() { + unsafeMoveToNextLogSegment(); + } + }); + } + + private void unsafeMoveToNextLogSegment() { + if (null != currentSegmentReader) { + segmentReadersToClose.add(currentSegmentReader); + currentSegmentReader.close().ensure(removeClosedSegmentReadersFunc); + logger.debug("close current segment reader {}", currentSegmentReader.getSegment()); + currentSegmentReader = null; + } + boolean hasSegmentToRead = false; + if (null != nextSegmentReader) { + currentSegmentReader = nextSegmentReader; + logger.debug("move to read segment {}", currentSegmentReader.getSegment()); + currentSegmentSequenceNumber = currentSegmentReader.getSegment().getLogSegmentSequenceNumber(); + nextSegmentReader = null; + // start reading + unsafeReadNext(currentSegmentReader); + unsafePrefetchNextSegment(true); + hasSegmentToRead = true; + } else { + unsafePrefetchNextSegment(false); + if (null != nextSegmentReader) { + currentSegmentReader = nextSegmentReader; + logger.debug("move to read segment {}", currentSegmentReader.getSegment()); + currentSegmentSequenceNumber = currentSegmentReader.getSegment().getLogSegmentSequenceNumber(); + nextSegmentReader = null; + unsafeReadNext(currentSegmentReader); + unsafePrefetchNextSegment(true); + hasSegmentToRead = true; + } + } + if (!hasSegmentToRead) { // no more segment to read, wait until new log segment arrive + if (isCatchingUp) { + logger.info("ReadAhead for {} is caught up and no log segments to read now", + readHandler.getFullyQualifiedName()); + isCatchingUp = false; + } + pauseReadAheadOnNoMoreLogSegments(); + } + } + + void scheduleReadNext() { + orderedSubmit(new CloseableRunnable() { + @Override + void safeRun() { + if (null == currentSegmentReader) { + pauseReadAheadOnNoMoreLogSegments(); + return; + } + unsafeReadNext(currentSegmentReader); + } + }); + } + + private void unsafeReadNext(SegmentReader reader) { + reader.readNext().addEventListener(this); + } + + @Override + public void onSegmentsUpdated(List<LogSegmentMetadata> segments) { + if (!started.get()) { + return; + } + logger.info("segments is updated with {}", segments); + processLogSegments(segments); + } + + @Override + public void onLogStreamDeleted() { + setLastException(new LogNotFoundException("Log stream " + + streamName + " is deleted")); + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/ReadUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/ReadUtils.java b/distributedlog-core/src/main/java/org/apache/distributedlog/ReadUtils.java new file mode 100644 index 0000000..9935d5f --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/ReadUtils.java @@ -0,0 +1,782 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.distributedlog.logsegment.LogSegmentEntryStore; +import org.apache.distributedlog.logsegment.LogSegmentRandomAccessEntryReader; +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import org.apache.distributedlog.selector.FirstDLSNNotLessThanSelector; +import org.apache.distributedlog.selector.FirstTxIdNotLessThanSelector; +import org.apache.distributedlog.selector.LastRecordSelector; +import org.apache.distributedlog.selector.LogRecordSelector; +import org.apache.distributedlog.util.FutureUtils.FutureEventListenerRunnable; +import com.twitter.util.Future; +import com.twitter.util.FutureEventListener; +import com.twitter.util.Promise; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.runtime.AbstractFunction0; +import scala.runtime.BoxedUnit; + +/** + * Utility function for readers + */ +public class ReadUtils { + + static final Logger LOG = LoggerFactory.getLogger(ReadUtils.class); + + private static final int MIN_SEARCH_BATCH_SIZE = 2; + + // + // Read First & Last Record Functions + // + + /** + * Read last record from a log segment. + * + * @param streamName + * fully qualified stream name (used for logging) + * @param l + * log segment metadata. + * @param fence + * whether to fence the log segment. + * @param includeControl + * whether to include control record. + * @param includeEndOfStream + * whether to include end of stream. + * @param scanStartBatchSize + * first num entries used for read last record scan + * @param scanMaxBatchSize + * max num entries used for read last record scan + * @param numRecordsScanned + * num of records scanned to get last record + * @param executorService + * executor service used for processing entries + * @param entryStore + * log segment entry store + * @return a future with last record. + */ + public static Future<LogRecordWithDLSN> asyncReadLastRecord( + final String streamName, + final LogSegmentMetadata l, + final boolean fence, + final boolean includeControl, + final boolean includeEndOfStream, + final int scanStartBatchSize, + final int scanMaxBatchSize, + final AtomicInteger numRecordsScanned, + final ExecutorService executorService, + final LogSegmentEntryStore entryStore) { + final LogRecordSelector selector = new LastRecordSelector(); + return asyncReadRecord(streamName, l, fence, includeControl, includeEndOfStream, scanStartBatchSize, + scanMaxBatchSize, numRecordsScanned, executorService, entryStore, + selector, true /* backward */, 0L); + } + + /** + * Read first record from a log segment with a DLSN larger than that given. + * + * @param streamName + * fully qualified stream name (used for logging) + * @param l + * log segment metadata. + * @param scanStartBatchSize + * first num entries used for read last record scan + * @param scanMaxBatchSize + * max num entries used for read last record scan + * @param numRecordsScanned + * num of records scanned to get last record + * @param executorService + * executor service used for processing entries + * @param entryStore + * log segment entry store + * @param dlsn + * threshold dlsn + * @return a future with last record. + */ + public static Future<LogRecordWithDLSN> asyncReadFirstUserRecord( + final String streamName, + final LogSegmentMetadata l, + final int scanStartBatchSize, + final int scanMaxBatchSize, + final AtomicInteger numRecordsScanned, + final ExecutorService executorService, + final LogSegmentEntryStore entryStore, + final DLSN dlsn) { + long startEntryId = 0L; + if (l.getLogSegmentSequenceNumber() == dlsn.getLogSegmentSequenceNo()) { + startEntryId = dlsn.getEntryId(); + } + final LogRecordSelector selector = new FirstDLSNNotLessThanSelector(dlsn); + return asyncReadRecord(streamName, l, false, false, false, scanStartBatchSize, + scanMaxBatchSize, numRecordsScanned, executorService, entryStore, + selector, false /* backward */, startEntryId); + } + + // + // Private methods for scanning log segments + // + + private static class ScanContext { + // variables to about current scan state + final AtomicInteger numEntriesToScan; + final AtomicLong curStartEntryId; + final AtomicLong curEndEntryId; + + // scan settings + final long startEntryId; + final long endEntryId; + final int scanStartBatchSize; + final int scanMaxBatchSize; + final boolean includeControl; + final boolean includeEndOfStream; + final boolean backward; + + // number of records scanned + final AtomicInteger numRecordsScanned; + + ScanContext(long startEntryId, long endEntryId, + int scanStartBatchSize, + int scanMaxBatchSize, + boolean includeControl, + boolean includeEndOfStream, + boolean backward, + AtomicInteger numRecordsScanned) { + this.startEntryId = startEntryId; + this.endEntryId = endEntryId; + this.scanStartBatchSize = scanStartBatchSize; + this.scanMaxBatchSize = scanMaxBatchSize; + this.includeControl = includeControl; + this.includeEndOfStream = includeEndOfStream; + this.backward = backward; + // Scan state + this.numEntriesToScan = new AtomicInteger(scanStartBatchSize); + if (backward) { + this.curStartEntryId = new AtomicLong( + Math.max(startEntryId, (endEntryId - scanStartBatchSize + 1))); + this.curEndEntryId = new AtomicLong(endEntryId); + } else { + this.curStartEntryId = new AtomicLong(startEntryId); + this.curEndEntryId = new AtomicLong( + Math.min(endEntryId, (startEntryId + scanStartBatchSize - 1))); + } + this.numRecordsScanned = numRecordsScanned; + } + + boolean moveToNextRange() { + if (backward) { + return moveBackward(); + } else { + return moveForward(); + } + } + + boolean moveBackward() { + long nextEndEntryId = curStartEntryId.get() - 1; + if (nextEndEntryId < startEntryId) { + // no entries to read again + return false; + } + curEndEntryId.set(nextEndEntryId); + // update num entries to scan + numEntriesToScan.set( + Math.min(numEntriesToScan.get() * 2, scanMaxBatchSize)); + // update start entry id + curStartEntryId.set(Math.max(startEntryId, nextEndEntryId - numEntriesToScan.get() + 1)); + return true; + } + + boolean moveForward() { + long nextStartEntryId = curEndEntryId.get() + 1; + if (nextStartEntryId > endEntryId) { + // no entries to read again + return false; + } + curStartEntryId.set(nextStartEntryId); + // update num entries to scan + numEntriesToScan.set( + Math.min(numEntriesToScan.get() * 2, scanMaxBatchSize)); + // update start entry id + curEndEntryId.set(Math.min(endEntryId, nextStartEntryId + numEntriesToScan.get() - 1)); + return true; + } + } + + private static class SingleEntryScanContext extends ScanContext { + SingleEntryScanContext(long entryId) { + super(entryId, entryId, 1, 1, true, true, false, new AtomicInteger(0)); + } + } + + /** + * Read record from a given range of log segment entries. + * + * @param streamName + * fully qualified stream name (used for logging) + * @param reader + * log segment random access reader + * @param executorService + * executor service used for processing entries + * @param context + * scan context + * @return a future with the log record. + */ + private static Future<LogRecordWithDLSN> asyncReadRecordFromEntries( + final String streamName, + final LogSegmentRandomAccessEntryReader reader, + final LogSegmentMetadata metadata, + final ExecutorService executorService, + final ScanContext context, + final LogRecordSelector selector) { + final Promise<LogRecordWithDLSN> promise = new Promise<LogRecordWithDLSN>(); + final long startEntryId = context.curStartEntryId.get(); + final long endEntryId = context.curEndEntryId.get(); + if (LOG.isDebugEnabled()) { + LOG.debug("{} reading entries [{} - {}] from {}.", + new Object[] { streamName, startEntryId, endEntryId, metadata}); + } + FutureEventListener<List<Entry.Reader>> readEntriesListener = + new FutureEventListener<List<Entry.Reader>>() { + @Override + public void onSuccess(final List<Entry.Reader> entries) { + if (LOG.isDebugEnabled()) { + LOG.debug("{} finished reading entries [{} - {}] from {}", + new Object[]{ streamName, startEntryId, endEntryId, metadata}); + } + for (Entry.Reader entry : entries) { + try { + visitEntryRecords(entry, context, selector); + } catch (IOException ioe) { + // exception is only thrown due to bad ledger entry, so it might be corrupted + // we shouldn't do anything beyond this point. throw the exception to application + promise.setException(ioe); + return; + } + } + + LogRecordWithDLSN record = selector.result(); + if (LOG.isDebugEnabled()) { + LOG.debug("{} got record from entries [{} - {}] of {} : {}", + new Object[]{streamName, startEntryId, endEntryId, + metadata, record}); + } + promise.setValue(record); + } + + @Override + public void onFailure(final Throwable cause) { + promise.setException(cause); + } + }; + reader.readEntries(startEntryId, endEntryId) + .addEventListener(FutureEventListenerRunnable.of(readEntriesListener, executorService)); + return promise; + } + + /** + * Process each record using LogRecordSelector. + * + * @param entry + * ledger entry + * @param context + * scan context + * @return log record with dlsn inside the ledger entry + * @throws IOException + */ + private static void visitEntryRecords( + Entry.Reader entry, + ScanContext context, + LogRecordSelector selector) throws IOException { + LogRecordWithDLSN nextRecord = entry.nextRecord(); + while (nextRecord != null) { + LogRecordWithDLSN record = nextRecord; + nextRecord = entry.nextRecord(); + context.numRecordsScanned.incrementAndGet(); + if (!context.includeControl && record.isControl()) { + continue; + } + if (!context.includeEndOfStream && record.isEndOfStream()) { + continue; + } + selector.process(record); + } + } + + /** + * Scan entries for the given record. + * + * @param streamName + * fully qualified stream name (used for logging) + * @param reader + * log segment random access reader + * @param executorService + * executor service used for processing entries + * @param promise + * promise to return desired record. + * @param context + * scan context + */ + private static void asyncReadRecordFromEntries( + final String streamName, + final LogSegmentRandomAccessEntryReader reader, + final LogSegmentMetadata metadata, + final ExecutorService executorService, + final Promise<LogRecordWithDLSN> promise, + final ScanContext context, + final LogRecordSelector selector) { + FutureEventListener<LogRecordWithDLSN> readEntriesListener = + new FutureEventListener<LogRecordWithDLSN>() { + @Override + public void onSuccess(LogRecordWithDLSN value) { + if (LOG.isDebugEnabled()) { + LOG.debug("{} read record from [{} - {}] of {} : {}", + new Object[]{streamName, context.curStartEntryId.get(), context.curEndEntryId.get(), + metadata, value}); + } + if (null != value) { + promise.setValue(value); + return; + } + if (!context.moveToNextRange()) { + // no entries to read again + promise.setValue(null); + return; + } + // scan next range + asyncReadRecordFromEntries(streamName, + reader, + metadata, + executorService, + promise, + context, + selector); + } + + @Override + public void onFailure(Throwable cause) { + promise.setException(cause); + } + }; + asyncReadRecordFromEntries(streamName, reader, metadata, executorService, context, selector) + .addEventListener(FutureEventListenerRunnable.of(readEntriesListener, executorService)); + } + + private static void asyncReadRecordFromLogSegment( + final String streamName, + final LogSegmentRandomAccessEntryReader reader, + final LogSegmentMetadata metadata, + final ExecutorService executorService, + final int scanStartBatchSize, + final int scanMaxBatchSize, + final boolean includeControl, + final boolean includeEndOfStream, + final Promise<LogRecordWithDLSN> promise, + final AtomicInteger numRecordsScanned, + final LogRecordSelector selector, + final boolean backward, + final long startEntryId) { + final long lastAddConfirmed = reader.getLastAddConfirmed(); + if (lastAddConfirmed < 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Log segment {} is empty for {}.", new Object[] { metadata, streamName }); + } + promise.setValue(null); + return; + } + final ScanContext context = new ScanContext( + startEntryId, lastAddConfirmed, + scanStartBatchSize, scanMaxBatchSize, + includeControl, includeEndOfStream, backward, numRecordsScanned); + asyncReadRecordFromEntries(streamName, reader, metadata, executorService, + promise, context, selector); + } + + private static Future<LogRecordWithDLSN> asyncReadRecord( + final String streamName, + final LogSegmentMetadata l, + final boolean fence, + final boolean includeControl, + final boolean includeEndOfStream, + final int scanStartBatchSize, + final int scanMaxBatchSize, + final AtomicInteger numRecordsScanned, + final ExecutorService executorService, + final LogSegmentEntryStore entryStore, + final LogRecordSelector selector, + final boolean backward, + final long startEntryId) { + + final Promise<LogRecordWithDLSN> promise = new Promise<LogRecordWithDLSN>(); + + FutureEventListener<LogSegmentRandomAccessEntryReader> openReaderListener = + new FutureEventListener<LogSegmentRandomAccessEntryReader>() { + @Override + public void onSuccess(final LogSegmentRandomAccessEntryReader reader) { + if (LOG.isDebugEnabled()) { + LOG.debug("{} Opened log segment {} for reading record", + streamName, l); + } + promise.ensure(new AbstractFunction0<BoxedUnit>() { + @Override + public BoxedUnit apply() { + reader.asyncClose(); + return BoxedUnit.UNIT; + } + }); + if (LOG.isDebugEnabled()) { + LOG.debug("{} {} scanning {}.", new Object[]{ + (backward ? "backward" : "forward"), streamName, l}); + } + asyncReadRecordFromLogSegment( + streamName, reader, l, executorService, + scanStartBatchSize, scanMaxBatchSize, + includeControl, includeEndOfStream, + promise, numRecordsScanned, selector, backward, startEntryId); + } + + @Override + public void onFailure(final Throwable cause) { + promise.setException(cause); + } + }; + entryStore.openRandomAccessReader(l, fence) + .addEventListener(FutureEventListenerRunnable.of(openReaderListener, executorService)); + return promise; + } + + // + // Search Functions + // + + /** + * Get the log record whose transaction id is not less than provided <code>transactionId</code>. + * + * <p> + * It uses a binary-search like algorithm to find the log record whose transaction id is not less than + * provided <code>transactionId</code> within a log <code>segment</code>. You could think of a log segment + * in terms of a sequence of records whose transaction ids are non-decreasing. + * + * - The sequence of records within a log segment is divided into N pieces. + * - Find the piece of records that contains a record whose transaction id is not less than provided + * <code>transactionId</code>. + * + * N could be chosen based on trading off concurrency and latency. + * </p> + * + * @param logName + * name of the log + * @param segment + * metadata of the log segment + * @param transactionId + * transaction id + * @param executorService + * executor service used for processing entries + * @param entryStore + * log segment entry store + * @param nWays + * how many number of entries to search in parallel + * @return found log record. none if all transaction ids are less than provided <code>transactionId</code>. + */ + public static Future<Optional<LogRecordWithDLSN>> getLogRecordNotLessThanTxId( + final String logName, + final LogSegmentMetadata segment, + final long transactionId, + final ExecutorService executorService, + final LogSegmentEntryStore entryStore, + final int nWays) { + if (!segment.isInProgress()) { + if (segment.getLastTxId() < transactionId) { + // all log records whose transaction id is less than provided transactionId + // then return none + Optional<LogRecordWithDLSN> noneRecord = Optional.absent(); + return Future.value(noneRecord); + } + } + + final Promise<Optional<LogRecordWithDLSN>> promise = + new Promise<Optional<LogRecordWithDLSN>>(); + final FutureEventListener<LogSegmentRandomAccessEntryReader> openReaderListener = + new FutureEventListener<LogSegmentRandomAccessEntryReader>() { + @Override + public void onSuccess(final LogSegmentRandomAccessEntryReader reader) { + promise.ensure(new AbstractFunction0<BoxedUnit>() { + @Override + public BoxedUnit apply() { + reader.asyncClose(); + return BoxedUnit.UNIT; + } + + }); + long lastEntryId = reader.getLastAddConfirmed(); + if (lastEntryId < 0) { + // it means that the log segment is created but not written yet or an empty log segment. + // it is equivalent to 'all log records whose transaction id is less than provided transactionId' + Optional<LogRecordWithDLSN> nonRecord = Optional.absent(); + promise.setValue(nonRecord); + return; + } + // all log records whose transaction id is not less than provided transactionId + if (segment.getFirstTxId() >= transactionId) { + final FirstTxIdNotLessThanSelector selector = + new FirstTxIdNotLessThanSelector(transactionId); + asyncReadRecordFromEntries( + logName, + reader, + segment, + executorService, + new SingleEntryScanContext(0L), + selector + ).addEventListener(new FutureEventListener<LogRecordWithDLSN>() { + @Override + public void onSuccess(LogRecordWithDLSN value) { + promise.setValue(Optional.of(selector.result())); + } + + @Override + public void onFailure(Throwable cause) { + promise.setException(cause); + } + }); + + return; + } + getLogRecordNotLessThanTxIdFromEntries( + logName, + segment, + transactionId, + executorService, + reader, + Lists.newArrayList(0L, lastEntryId), + nWays, + Optional.<LogRecordWithDLSN>absent(), + promise); + } + + @Override + public void onFailure(final Throwable cause) { + promise.setException(cause); + } + }; + + entryStore.openRandomAccessReader(segment, false) + .addEventListener(FutureEventListenerRunnable.of(openReaderListener, executorService)); + return promise; + } + + /** + * Find the log record whose transaction id is not less than provided <code>transactionId</code> from + * entries between <code>startEntryId</code> and <code>endEntryId</code>. + * + * @param logName + * name of the log + * @param segment + * log segment + * @param transactionId + * provided transaction id to search + * @param executorService + * executor service + * @param reader + * log segment random access reader + * @param entriesToSearch + * list of entries to search + * @param nWays + * how many entries to search in parallel + * @param prevFoundRecord + * the log record found in previous search + * @param promise + * promise to satisfy the result + */ + private static void getLogRecordNotLessThanTxIdFromEntries( + final String logName, + final LogSegmentMetadata segment, + final long transactionId, + final ExecutorService executorService, + final LogSegmentRandomAccessEntryReader reader, + final List<Long> entriesToSearch, + final int nWays, + final Optional<LogRecordWithDLSN> prevFoundRecord, + final Promise<Optional<LogRecordWithDLSN>> promise) { + final List<Future<LogRecordWithDLSN>> searchResults = + Lists.newArrayListWithExpectedSize(entriesToSearch.size()); + for (Long entryId : entriesToSearch) { + LogRecordSelector selector = new FirstTxIdNotLessThanSelector(transactionId); + Future<LogRecordWithDLSN> searchResult = asyncReadRecordFromEntries( + logName, + reader, + segment, + executorService, + new SingleEntryScanContext(entryId), + selector); + searchResults.add(searchResult); + } + FutureEventListener<List<LogRecordWithDLSN>> processSearchResultsListener = + new FutureEventListener<List<LogRecordWithDLSN>>() { + @Override + public void onSuccess(List<LogRecordWithDLSN> resultList) { + processSearchResults( + logName, + segment, + transactionId, + executorService, + reader, + resultList, + nWays, + prevFoundRecord, + promise); + } + + @Override + public void onFailure(Throwable cause) { + promise.setException(cause); + } + }; + Future.collect(searchResults).addEventListener( + FutureEventListenerRunnable.of(processSearchResultsListener, executorService)); + } + + /** + * Process the search results + */ + static void processSearchResults( + final String logName, + final LogSegmentMetadata segment, + final long transactionId, + final ExecutorService executorService, + final LogSegmentRandomAccessEntryReader reader, + final List<LogRecordWithDLSN> searchResults, + final int nWays, + final Optional<LogRecordWithDLSN> prevFoundRecord, + final Promise<Optional<LogRecordWithDLSN>> promise) { + int found = -1; + for (int i = 0; i < searchResults.size(); i++) { + LogRecordWithDLSN record = searchResults.get(i); + if (record.getTransactionId() >= transactionId) { + found = i; + break; + } + } + if (found == -1) { // all log records' transaction id is less than provided transaction id + promise.setValue(prevFoundRecord); + return; + } + // we found a log record + LogRecordWithDLSN foundRecord = searchResults.get(found); + + // we found it + // - it is not the first record + // - it is the first record in first search entry + // - its entry is adjacent to previous search entry + if (foundRecord.getDlsn().getSlotId() != 0L + || found == 0 + || foundRecord.getDlsn().getEntryId() == (searchResults.get(found - 1).getDlsn().getEntryId() + 1)) { + promise.setValue(Optional.of(foundRecord)); + return; + } + + // otherwise, we need to search + List<Long> nextSearchBatch = getEntriesToSearch( + transactionId, + searchResults.get(found - 1), + searchResults.get(found), + nWays); + if (nextSearchBatch.isEmpty()) { + promise.setValue(prevFoundRecord); + return; + } + getLogRecordNotLessThanTxIdFromEntries( + logName, + segment, + transactionId, + executorService, + reader, + nextSearchBatch, + nWays, + Optional.of(foundRecord), + promise); + } + + /** + * Get the entries to search provided <code>transactionId</code> between + * <code>firstRecord</code> and <code>lastRecord</code>. <code>firstRecord</code> + * and <code>lastRecord</code> are already searched, which the transaction id + * of <code>firstRecord</code> is less than <code>transactionId</code> and the + * transaction id of <code>lastRecord</code> is not less than <code>transactionId</code>. + * + * @param transactionId + * transaction id to search + * @param firstRecord + * log record that already searched whose transaction id is leass than <code>transactionId</code>. + * @param lastRecord + * log record that already searched whose transaction id is not less than <code>transactionId</code>. + * @param nWays + * N-ways to search + * @return the list of entries to search + */ + static List<Long> getEntriesToSearch( + long transactionId, + LogRecordWithDLSN firstRecord, + LogRecordWithDLSN lastRecord, + int nWays) { + long txnDiff = lastRecord.getTransactionId() - firstRecord.getTransactionId(); + if (txnDiff > 0) { + if (lastRecord.getTransactionId() == transactionId) { + List<Long> entries = getEntriesToSearch( + firstRecord.getDlsn().getEntryId() + 1, + lastRecord.getDlsn().getEntryId() - 2, + Math.max(MIN_SEARCH_BATCH_SIZE, nWays - 1)); + entries.add(lastRecord.getDlsn().getEntryId() - 1); + return entries; + } else { + // TODO: improve it by estimating transaction ids. + return getEntriesToSearch( + firstRecord.getDlsn().getEntryId() + 1, + lastRecord.getDlsn().getEntryId() - 1, + nWays); + } + } else { + // unexpected condition + return Lists.newArrayList(); + } + } + + static List<Long> getEntriesToSearch( + long startEntryId, + long endEntryId, + int nWays) { + if (startEntryId > endEntryId) { + return Lists.newArrayList(); + } + long numEntries = endEntryId - startEntryId + 1; + long step = Math.max(1L, numEntries / nWays); + List<Long> entryList = Lists.newArrayListWithExpectedSize(nWays); + for (long i = startEntryId, j = nWays - 1; i <= endEntryId && j > 0; i += step, j--) { + entryList.add(i); + } + if (entryList.get(entryList.size() - 1) < endEntryId) { + entryList.add(endEntryId); + } + return entryList; + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/WriteLimiter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/WriteLimiter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/WriteLimiter.java new file mode 100644 index 0000000..d25d056 --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/WriteLimiter.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import org.apache.distributedlog.exceptions.OverCapacityException; +import org.apache.distributedlog.util.PermitLimiter; + +public class WriteLimiter { + + String streamName; + final PermitLimiter streamLimiter; + final PermitLimiter globalLimiter; + + public WriteLimiter(String streamName, PermitLimiter streamLimiter, PermitLimiter globalLimiter) { + this.streamName = streamName; + this.streamLimiter = streamLimiter; + this.globalLimiter = globalLimiter; + } + + public void acquire() throws OverCapacityException { + if (!streamLimiter.acquire()) { + throw new OverCapacityException(String.format("Stream write capacity exceeded for stream %s", streamName)); + } + try { + if (!globalLimiter.acquire()) { + throw new OverCapacityException("Global write capacity exceeded"); + } + } catch (OverCapacityException ex) { + streamLimiter.release(1); + throw ex; + } + } + + public void release() { + release(1); + } + + public void release(int permits) { + streamLimiter.release(permits); + globalLimiter.release(permits); + } + + public void close() { + streamLimiter.close(); + globalLimiter.close(); + } +}