http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadEntryReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadEntryReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadEntryReader.java deleted file mode 100644 index 40e3930..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadEntryReader.java +++ /dev/null @@ -1,992 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Stopwatch; -import com.google.common.base.Ticker; -import com.google.common.collect.Lists; -import com.twitter.distributedlog.callback.LogSegmentListener; -import com.twitter.distributedlog.exceptions.AlreadyTruncatedTransactionException; -import com.twitter.distributedlog.exceptions.DLIllegalStateException; -import com.twitter.distributedlog.exceptions.DLInterruptedException; -import com.twitter.distributedlog.exceptions.EndOfLogSegmentException; -import com.twitter.distributedlog.exceptions.LogNotFoundException; -import com.twitter.distributedlog.exceptions.UnexpectedException; -import com.twitter.distributedlog.io.AsyncCloseable; -import com.twitter.distributedlog.logsegment.LogSegmentEntryReader; -import com.twitter.distributedlog.logsegment.LogSegmentEntryStore; -import com.twitter.distributedlog.logsegment.LogSegmentFilter; -import com.twitter.distributedlog.util.OrderedScheduler; -import com.twitter.util.Function0; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Futures; -import com.twitter.util.Promise; -import org.apache.bookkeeper.stats.AlertStatsLogger; -import org.apache.bookkeeper.versioning.Versioned; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.Function1; -import scala.runtime.AbstractFunction1; -import scala.runtime.BoxedUnit; - -import java.io.IOException; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -/** - * New ReadAhead Reader that uses {@link com.twitter.distributedlog.logsegment.LogSegmentEntryReader}. - * - * NOTE: all the state changes happen in the same thread. All *unsafe* methods should be submitted to the order - * scheduler using stream name as the key. - */ -public class ReadAheadEntryReader implements - AsyncCloseable, - LogSegmentListener, - LogSegmentEntryReader.StateChangeListener, - FutureEventListener<List<Entry.Reader>> { - - private static final Logger logger = LoggerFactory.getLogger(ReadAheadEntryReader.class); - - // - // Static Functions - // - - private static AbstractFunction1<LogSegmentEntryReader, BoxedUnit> START_READER_FUNC = new AbstractFunction1<LogSegmentEntryReader, BoxedUnit>() { - @Override - public BoxedUnit apply(LogSegmentEntryReader reader) { - reader.start(); - return BoxedUnit.UNIT; - } - }; - - // - // Internal Classes - // - - class SegmentReader implements FutureEventListener<LogSegmentEntryReader> { - - private LogSegmentMetadata metadata; - private final long startEntryId; - private Future<LogSegmentEntryReader> openFuture = null; - private LogSegmentEntryReader reader = null; - private boolean isStarted = false; - private boolean isClosed = false; - - SegmentReader(LogSegmentMetadata metadata, - long startEntryId) { - this.metadata = metadata; - this.startEntryId = startEntryId; - } - - synchronized LogSegmentEntryReader getEntryReader() { - return reader; - } - - synchronized boolean isBeyondLastAddConfirmed() { - return null != reader && reader.isBeyondLastAddConfirmed(); - } - - synchronized LogSegmentMetadata getSegment() { - return metadata; - } - - synchronized boolean isReaderOpen() { - return null != openFuture; - } - - synchronized void openReader() { - if (null != openFuture) { - return; - } - openFuture = entryStore.openReader(metadata, startEntryId).addEventListener(this); - } - - synchronized boolean isReaderStarted() { - return isStarted; - } - - synchronized void startRead() { - if (isStarted) { - return; - } - isStarted = true; - if (null != reader) { - reader.start(); - } else { - openFuture.onSuccess(START_READER_FUNC); - } - } - - synchronized Future<List<Entry.Reader>> readNext() { - if (null != reader) { - checkCatchingUpStatus(reader); - return reader.readNext(numReadAheadEntries); - } else { - return openFuture.flatMap(readFunc); - } - } - - synchronized void updateLogSegmentMetadata(final LogSegmentMetadata segment) { - if (null != reader) { - reader.onLogSegmentMetadataUpdated(segment); - this.metadata = segment; - } else { - openFuture.onSuccess(new AbstractFunction1<LogSegmentEntryReader, BoxedUnit>() { - @Override - public BoxedUnit apply(LogSegmentEntryReader reader) { - reader.onLogSegmentMetadataUpdated(segment); - synchronized (SegmentReader.this) { - SegmentReader.this.metadata = segment; - } - return BoxedUnit.UNIT; - } - }); - } - } - - @Override - synchronized public void onSuccess(LogSegmentEntryReader reader) { - this.reader = reader; - if (reader.getSegment().isInProgress()) { - reader.registerListener(ReadAheadEntryReader.this); - } - } - - @Override - public void onFailure(Throwable cause) { - // no-op, the failure will be propagated on first read. - } - - synchronized boolean isClosed() { - return isClosed; - } - - synchronized Future<Void> close() { - if (null == openFuture) { - return Future.Void(); - } - return openFuture.flatMap(new AbstractFunction1<LogSegmentEntryReader, Future<Void>>() { - @Override - public Future<Void> apply(LogSegmentEntryReader reader) { - return reader.asyncClose(); - } - }).ensure(new Function0<BoxedUnit>() { - @Override - public BoxedUnit apply() { - synchronized (SegmentReader.this) { - isClosed = true; - } - return null; - } - }); - } - } - - private class ReadEntriesFunc extends AbstractFunction1<LogSegmentEntryReader, Future<List<Entry.Reader>>> { - - private final int numEntries; - - ReadEntriesFunc(int numEntries) { - this.numEntries = numEntries; - } - - @Override - public Future<List<Entry.Reader>> apply(LogSegmentEntryReader reader) { - checkCatchingUpStatus(reader); - return reader.readNext(numEntries); - } - } - - private abstract class CloseableRunnable implements Runnable { - - @Override - public void run() { - synchronized (ReadAheadEntryReader.this) { - if (null != closePromise) { - return; - } - } - try { - safeRun(); - } catch (Throwable cause) { - logger.error("Caught unexpected exception : ", cause); - } - } - - abstract void safeRun(); - - } - - // - // Functions - // - private final Function1<LogSegmentEntryReader, Future<List<Entry.Reader>>> readFunc; - private final Function0<BoxedUnit> removeClosedSegmentReadersFunc = new Function0<BoxedUnit>() { - @Override - public BoxedUnit apply() { - removeClosedSegmentReaders(); - return BoxedUnit.UNIT; - } - }; - - // - // Resources - // - private final DistributedLogConfiguration conf; - private final BKLogReadHandler readHandler; - private final LogSegmentEntryStore entryStore; - private final OrderedScheduler scheduler; - - // - // Parameters - // - private final String streamName; - private final DLSN fromDLSN; - private final int maxCachedEntries; - private final int numReadAheadEntries; - private final int idleWarnThresholdMillis; - - // - // Cache - // - private final LinkedBlockingQueue<Entry.Reader> entryQueue; - - // - // State of the reader - // - - private final AtomicBoolean started = new AtomicBoolean(false); - private boolean isInitialized = false; - private boolean readAheadPaused = false; - private Promise<Void> closePromise = null; - // segment readers - private long currentSegmentSequenceNumber; - private SegmentReader currentSegmentReader; - private SegmentReader nextSegmentReader; - private DLSN lastDLSN; - private final EntryPosition nextEntryPosition; - private volatile boolean isCatchingUp = true; - private final LinkedList<SegmentReader> segmentReaders; - private final LinkedList<SegmentReader> segmentReadersToClose; - // last exception that this reader encounters - private final AtomicReference<IOException> lastException = new AtomicReference<IOException>(null); - // last entry added time - private final Stopwatch lastEntryAddedTime; - // state change notification - private final CopyOnWriteArraySet<AsyncNotification> stateChangeNotifications = - new CopyOnWriteArraySet<AsyncNotification>(); - // idle reader check task - private final ScheduledFuture<?> idleReaderCheckTask; - - // - // Stats - // - private final AlertStatsLogger alertStatsLogger; - - public ReadAheadEntryReader(String streamName, - DLSN fromDLSN, - DistributedLogConfiguration conf, - BKLogReadHandler readHandler, - LogSegmentEntryStore entryStore, - OrderedScheduler scheduler, - Ticker ticker, - AlertStatsLogger alertStatsLogger) { - this.streamName = streamName; - this.fromDLSN = lastDLSN = fromDLSN; - this.nextEntryPosition = new EntryPosition( - fromDLSN.getLogSegmentSequenceNo(), - fromDLSN.getEntryId()); - this.conf = conf; - this.maxCachedEntries = conf.getReadAheadMaxRecords(); - this.numReadAheadEntries = conf.getReadAheadBatchSize(); - this.idleWarnThresholdMillis = conf.getReaderIdleWarnThresholdMillis(); - this.readHandler = readHandler; - this.entryStore = entryStore; - this.scheduler = scheduler; - this.readFunc = new ReadEntriesFunc(numReadAheadEntries); - this.alertStatsLogger = alertStatsLogger; - - // create the segment reader list - this.segmentReaders = new LinkedList<SegmentReader>(); - this.segmentReadersToClose = new LinkedList<SegmentReader>(); - // create the readahead entry queue - this.entryQueue = new LinkedBlockingQueue<Entry.Reader>(); - - // start the idle reader detection - lastEntryAddedTime = Stopwatch.createStarted(ticker); - // start the idle reader check task - idleReaderCheckTask = scheduleIdleReaderTaskIfNecessary(); - } - - private ScheduledFuture<?> scheduleIdleReaderTaskIfNecessary() { - if (idleWarnThresholdMillis < Integer.MAX_VALUE && idleWarnThresholdMillis > 0) { - return scheduler.scheduleAtFixedRate(streamName, new Runnable() { - @Override - public void run() { - if (!isReaderIdle(idleWarnThresholdMillis, TimeUnit.MILLISECONDS)) { - return; - } - // the readahead has been idle - unsafeCheckIfReadAheadIsIdle(); - } - }, idleWarnThresholdMillis, idleWarnThresholdMillis, TimeUnit.MILLISECONDS); - } - return null; - } - - private void unsafeCheckIfReadAheadIsIdle() { - boolean forceReadLogSegments = - (null == currentSegmentReader) || currentSegmentReader.isBeyondLastAddConfirmed(); - if (forceReadLogSegments) { - readHandler.readLogSegmentsFromStore( - LogSegmentMetadata.COMPARATOR, - LogSegmentFilter.DEFAULT_FILTER, - null - ).addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { - @Override - public void onFailure(Throwable cause) { - // do nothing here since it would be retried on next idle reader check task - } - - @Override - public void onSuccess(Versioned<List<LogSegmentMetadata>> segments) { - onSegmentsUpdated(segments.getValue()); - } - }); - } - } - - private void cancelIdleReaderTask() { - if (null != idleReaderCheckTask) { - idleReaderCheckTask.cancel(true); - } - } - - @VisibleForTesting - EntryPosition getNextEntryPosition() { - return nextEntryPosition; - } - - @VisibleForTesting - SegmentReader getCurrentSegmentReader() { - return currentSegmentReader; - } - - @VisibleForTesting - long getCurrentSegmentSequenceNumber() { - return currentSegmentSequenceNumber; - } - - @VisibleForTesting - SegmentReader getNextSegmentReader() { - return nextSegmentReader; - } - - @VisibleForTesting - LinkedList<SegmentReader> getSegmentReaders() { - return segmentReaders; - } - - @VisibleForTesting - boolean isInitialized() { - return isInitialized; - } - - private void orderedSubmit(Runnable runnable) { - synchronized (this) { - if (null != closePromise) { - return; - } - } - try { - scheduler.submit(streamName, runnable); - } catch (RejectedExecutionException ree) { - logger.debug("Failed to submit and execute an operation for readhead entry reader of {}", - streamName, ree); - } - } - - public void start(final List<LogSegmentMetadata> segmentList) { - logger.info("Starting the readahead entry reader for {} : segments = {}", - readHandler.getFullyQualifiedName(), segmentList); - started.set(true); - processLogSegments(segmentList); - } - - private void removeClosedSegmentReaders() { - orderedSubmit(new CloseableRunnable() { - @Override - void safeRun() { - unsafeRemoveClosedSegmentReaders(); - } - }); - } - - private void unsafeRemoveClosedSegmentReaders() { - SegmentReader reader = segmentReadersToClose.peekFirst(); - while (null != reader) { - if (reader.isClosed()) { - segmentReadersToClose.pollFirst(); - reader = segmentReadersToClose.peekFirst(); - } else { - break; - } - } - } - - @Override - public Future<Void> asyncClose() { - final Promise<Void> closeFuture; - synchronized (this) { - if (null != closePromise) { - return closePromise; - } - closePromise = closeFuture = new Promise<Void>(); - } - - // cancel the idle reader task - cancelIdleReaderTask(); - - // use runnable here instead of CloseableRunnable, - // because we need this to be executed - try { - scheduler.submit(streamName, new Runnable() { - @Override - public void run() { - unsafeAsyncClose(closeFuture); - } - }); - } catch (RejectedExecutionException ree) { - logger.warn("Scheduler has been shutdown before closing the readahead entry reader for stream {}", - streamName, ree); - unsafeAsyncClose(closeFuture); - } - - return closeFuture; - } - - private void unsafeAsyncClose(Promise<Void> closePromise) { - List<Future<Void>> closeFutures = Lists.newArrayListWithExpectedSize( - segmentReaders.size() + segmentReadersToClose.size() + 1); - if (null != currentSegmentReader) { - segmentReadersToClose.add(currentSegmentReader); - } - if (null != nextSegmentReader) { - segmentReadersToClose.add(nextSegmentReader); - } - for (SegmentReader reader : segmentReaders) { - segmentReadersToClose.add(reader); - } - segmentReaders.clear(); - for (SegmentReader reader : segmentReadersToClose) { - closeFutures.add(reader.close()); - } - Futures.collect(closeFutures).proxyTo(closePromise); - } - - // - // Reader State Changes - // - - ReadAheadEntryReader addStateChangeNotification(AsyncNotification notification) { - this.stateChangeNotifications.add(notification); - return this; - } - - ReadAheadEntryReader removeStateChangeNotification(AsyncNotification notification) { - this.stateChangeNotifications.remove(notification); - return this; - } - - private void notifyStateChangeOnSuccess() { - for (AsyncNotification notification : stateChangeNotifications) { - notification.notifyOnOperationComplete(); - } - } - - private void notifyStateChangeOnFailure(Throwable cause) { - for (AsyncNotification notification : stateChangeNotifications) { - notification.notifyOnError(cause); - } - } - - void setLastException(IOException cause) { - if (!lastException.compareAndSet(null, cause)) { - logger.debug("last exception has already been set to ", lastException.get()); - } - // the exception is set and notify the state change - notifyStateChangeOnFailure(cause); - } - - void checkLastException() throws IOException { - if (null != lastException.get()) { - throw lastException.get(); - } - } - - void checkCatchingUpStatus(LogSegmentEntryReader reader) { - if (reader.getSegment().isInProgress() - && isCatchingUp - && reader.hasCaughtUpOnInprogress()) { - logger.info("ReadAhead for {} is caught up at entry {} @ log segment {}.", - new Object[] { readHandler.getFullyQualifiedName(), - reader.getLastAddConfirmed(), reader.getSegment() }); - isCatchingUp = false; - } - } - - void markCaughtup() { - if (isCatchingUp) { - isCatchingUp = false; - logger.info("ReadAhead for {} is caught up", readHandler.getFullyQualifiedName()); - } - } - - public boolean isReadAheadCaughtUp() { - return !isCatchingUp; - } - - @Override - public void onCaughtupOnInprogress() { - markCaughtup(); - } - - // - // ReadAhead State Machine - // - - @Override - public void onSuccess(List<Entry.Reader> entries) { - lastEntryAddedTime.reset().start(); - for (Entry.Reader entry : entries) { - entryQueue.add(entry); - } - if (!entries.isEmpty()) { - Entry.Reader lastEntry = entries.get(entries.size() - 1); - nextEntryPosition.advance(lastEntry.getLSSN(), lastEntry.getEntryId() + 1); - } - // notify on data available - notifyStateChangeOnSuccess(); - if (entryQueue.size() >= maxCachedEntries) { - pauseReadAheadOnCacheFull(); - } else { - scheduleReadNext(); - } - } - - @Override - public void onFailure(Throwable cause) { - if (cause instanceof EndOfLogSegmentException) { - // we reach end of the log segment - moveToNextLogSegment(); - return; - } - if (cause instanceof IOException) { - setLastException((IOException) cause); - } else { - setLastException(new UnexpectedException("Unexpected non I/O exception", cause)); - } - } - - private synchronized void invokeReadAhead() { - if (readAheadPaused) { - scheduleReadNext(); - readAheadPaused = false; - } - } - - private synchronized void pauseReadAheadOnCacheFull() { - this.readAheadPaused = true; - if (!isCacheFull()) { - invokeReadAhead(); - } - } - - private synchronized void pauseReadAheadOnNoMoreLogSegments() { - this.readAheadPaused = true; - } - - // - // Cache Related Methods - // - - public Entry.Reader getNextReadAheadEntry(long waitTime, TimeUnit waitTimeUnit) throws IOException { - if (null != lastException.get()) { - throw lastException.get(); - } - Entry.Reader entry; - try { - entry = entryQueue.poll(waitTime, waitTimeUnit); - } catch (InterruptedException e) { - throw new DLInterruptedException("Interrupted on waiting next readahead entry : ", e); - } - try { - return entry; - } finally { - // resume readahead if the cache becomes empty - if (null != entry && !isCacheFull()) { - invokeReadAhead(); - } - } - } - - /** - * Return number cached entries. - * - * @return number cached entries. - */ - public int getNumCachedEntries() { - return entryQueue.size(); - } - - /** - * Return if the cache is full. - * - * @return true if the cache is full, otherwise false. - */ - public boolean isCacheFull() { - return getNumCachedEntries() >= maxCachedEntries; - } - - @VisibleForTesting - public boolean isCacheEmpty() { - return entryQueue.isEmpty(); - } - - /** - * Check whether the readahead becomes stall. - * - * @param idleReaderErrorThreshold idle reader error threshold - * @param timeUnit time unit of the idle reader error threshold - * @return true if the readahead becomes stall, otherwise false. - */ - public boolean isReaderIdle(int idleReaderErrorThreshold, TimeUnit timeUnit) { - return (lastEntryAddedTime.elapsed(timeUnit) > idleReaderErrorThreshold); - } - - // - // LogSegment Management - // - - void processLogSegments(final List<LogSegmentMetadata> segments) { - orderedSubmit(new CloseableRunnable() { - @Override - void safeRun() { - unsafeProcessLogSegments(segments); - } - }); - } - - private void unsafeProcessLogSegments(List<LogSegmentMetadata> segments) { - if (isInitialized) { - unsafeReinitializeLogSegments(segments); - } else { - unsafeInitializeLogSegments(segments); - } - } - - /** - * Update the log segment metadata. - * - * @param reader the reader to update the metadata - * @param newMetadata the new metadata received - * @return true if successfully, false on encountering errors - */ - private boolean updateLogSegmentMetadata(SegmentReader reader, - LogSegmentMetadata newMetadata) { - if (reader.getSegment().getLogSegmentSequenceNumber() != newMetadata.getLogSegmentSequenceNumber()) { - setLastException(new DLIllegalStateException("Inconsistent state found in entry reader for " - + streamName + " : current segment = " + reader.getSegment() + ", new segment = " + newMetadata)); - return false; - } - if (!reader.getSegment().isInProgress() && newMetadata.isInProgress()) { - setLastException(new DLIllegalStateException("An inprogress log segment " + newMetadata - + " received after a closed log segment " + reader.getSegment() + " on reading segment " - + newMetadata.getLogSegmentSequenceNumber() + " @ stream " + streamName)); - return false; - } - if (reader.getSegment().isInProgress() && !newMetadata.isInProgress()) { - reader.updateLogSegmentMetadata(newMetadata); - } - return true; - } - - /** - * Reinitialize the log segments - */ - private void unsafeReinitializeLogSegments(List<LogSegmentMetadata> segments) { - logger.info("Reinitialize log segments with {}", segments); - int segmentIdx = 0; - for (; segmentIdx < segments.size(); segmentIdx++) { - LogSegmentMetadata segment = segments.get(segmentIdx); - if (segment.getLogSegmentSequenceNumber() < currentSegmentSequenceNumber) { - continue; - } - break; - } - if (segmentIdx >= segments.size()) { - return; - } - LogSegmentMetadata segment = segments.get(segmentIdx); - if (null != currentSegmentReader) { - if (!updateLogSegmentMetadata(currentSegmentReader, segment)) { - return; - } - } else { - if (currentSegmentSequenceNumber != segment.getLogSegmentSequenceNumber()) { - setLastException(new DLIllegalStateException("Inconsistent state found in entry reader for " - + streamName + " : current segment sn = " + currentSegmentSequenceNumber - + ", new segment sn = " + segment.getLogSegmentSequenceNumber())); - return; - } - } - segmentIdx++; - if (segmentIdx >= segments.size()) { - return; - } - // check next segment - segment = segments.get(segmentIdx); - if (null != nextSegmentReader) { - if (!updateLogSegmentMetadata(nextSegmentReader, segment)) { - return; - } - segmentIdx++; - } - // check the segment readers in the queue - for (int readerIdx = 0; - readerIdx < segmentReaders.size() && segmentIdx < segments.size(); - readerIdx++, segmentIdx++) { - SegmentReader reader = segmentReaders.get(readerIdx); - segment = segments.get(segmentIdx); - if (!updateLogSegmentMetadata(reader, segment)) { - return; - } - } - // add the remaining segments to the reader queue - for (; segmentIdx < segments.size(); segmentIdx++) { - segment = segments.get(segmentIdx); - SegmentReader reader = new SegmentReader(segment, 0L); - reader.openReader(); - segmentReaders.add(reader); - } - if (null == currentSegmentReader) { - unsafeMoveToNextLogSegment(); - } - // resume readahead if necessary - invokeReadAhead(); - } - - /** - * Initialize the reader with the log <i>segments</i>. - * - * @param segments list of log segments - */ - private void unsafeInitializeLogSegments(List<LogSegmentMetadata> segments) { - if (segments.isEmpty()) { - // not initialize the background reader, until the first log segment is notified - return; - } - boolean skipTruncatedLogSegments = true; - DLSN dlsnToStart = fromDLSN; - // positioning the reader - for (int i = 0; i < segments.size(); i++) { - LogSegmentMetadata segment = segments.get(i); - // skip any log segments that have smaller log segment sequence numbers - if (segment.getLogSegmentSequenceNumber() < fromDLSN.getLogSegmentSequenceNo()) { - continue; - } - // if the log segment is truncated, skip it. - if (skipTruncatedLogSegments && - !conf.getIgnoreTruncationStatus() && - segment.isTruncated()) { - continue; - } - // if the log segment is partially truncated, move the start dlsn to the min active dlsn - if (skipTruncatedLogSegments && - !conf.getIgnoreTruncationStatus() && - segment.isPartiallyTruncated()) { - if (segment.getMinActiveDLSN().compareTo(fromDLSN) > 0) { - dlsnToStart = segment.getMinActiveDLSN(); - } - } - skipTruncatedLogSegments = false; - if (!isAllowedToPosition(segment, dlsnToStart)) { - logger.error("segment {} is not allowed to position at {}", segment, dlsnToStart); - return; - } - - SegmentReader reader = new SegmentReader(segment, - segment.getLogSegmentSequenceNumber() == dlsnToStart.getLogSegmentSequenceNo() - ? dlsnToStart.getEntryId() : 0L); - segmentReaders.add(reader); - } - if (segmentReaders.isEmpty()) { - // not initialize the background reader, until the first log segment is available to read - return; - } - currentSegmentReader = segmentReaders.pollFirst(); - currentSegmentReader.openReader(); - currentSegmentReader.startRead(); - currentSegmentSequenceNumber = currentSegmentReader.getSegment().getLogSegmentSequenceNumber(); - unsafeReadNext(currentSegmentReader); - if (!segmentReaders.isEmpty()) { - for (SegmentReader reader : segmentReaders) { - reader.openReader(); - } - unsafePrefetchNextSegment(true); - } - // mark the reader initialized - isInitialized = true; - } - - private void unsafePrefetchNextSegment(boolean onlyInprogressLogSegment) { - SegmentReader nextReader = segmentReaders.peekFirst(); - // open the next log segment if it is inprogress - if (null != nextReader) { - if (onlyInprogressLogSegment && !nextReader.getSegment().isInProgress()) { - return; - } - nextReader.startRead(); - nextSegmentReader = nextReader; - segmentReaders.pollFirst(); - } - } - - /** - * Check if we are allowed to position the reader at <i>fromDLSN</i>. - * - * @return true if it is allowed, otherwise false. - */ - private boolean isAllowedToPosition(LogSegmentMetadata segment, DLSN fromDLSN) { - if (segment.isTruncated() - && segment.getLastDLSN().compareTo(fromDLSN) >= 0 - && !conf.getIgnoreTruncationStatus()) { - setLastException(new AlreadyTruncatedTransactionException(streamName - + " : trying to position read ahead at " + fromDLSN - + " on a segment " + segment + " that is already marked as truncated")); - return false; - } - if (segment.isPartiallyTruncated() && - segment.getMinActiveDLSN().compareTo(fromDLSN) > 0) { - if (conf.getAlertWhenPositioningOnTruncated()) { - alertStatsLogger.raise("Trying to position reader on {} when {} is marked partially truncated", - fromDLSN, segment); - } - if (!conf.getIgnoreTruncationStatus()) { - logger.error("{}: Trying to position reader on {} when {} is marked partially truncated", - new Object[]{ streamName, fromDLSN, segment }); - - setLastException(new AlreadyTruncatedTransactionException(streamName - + " : trying to position read ahead at " + fromDLSN - + " on a segment " + segment + " that is already marked as truncated")); - return false; - } - } - return true; - } - - void moveToNextLogSegment() { - orderedSubmit(new CloseableRunnable() { - @Override - void safeRun() { - unsafeMoveToNextLogSegment(); - } - }); - } - - private void unsafeMoveToNextLogSegment() { - if (null != currentSegmentReader) { - segmentReadersToClose.add(currentSegmentReader); - currentSegmentReader.close().ensure(removeClosedSegmentReadersFunc); - logger.debug("close current segment reader {}", currentSegmentReader.getSegment()); - currentSegmentReader = null; - } - boolean hasSegmentToRead = false; - if (null != nextSegmentReader) { - currentSegmentReader = nextSegmentReader; - logger.debug("move to read segment {}", currentSegmentReader.getSegment()); - currentSegmentSequenceNumber = currentSegmentReader.getSegment().getLogSegmentSequenceNumber(); - nextSegmentReader = null; - // start reading - unsafeReadNext(currentSegmentReader); - unsafePrefetchNextSegment(true); - hasSegmentToRead = true; - } else { - unsafePrefetchNextSegment(false); - if (null != nextSegmentReader) { - currentSegmentReader = nextSegmentReader; - logger.debug("move to read segment {}", currentSegmentReader.getSegment()); - currentSegmentSequenceNumber = currentSegmentReader.getSegment().getLogSegmentSequenceNumber(); - nextSegmentReader = null; - unsafeReadNext(currentSegmentReader); - unsafePrefetchNextSegment(true); - hasSegmentToRead = true; - } - } - if (!hasSegmentToRead) { // no more segment to read, wait until new log segment arrive - if (isCatchingUp) { - logger.info("ReadAhead for {} is caught up and no log segments to read now", - readHandler.getFullyQualifiedName()); - isCatchingUp = false; - } - pauseReadAheadOnNoMoreLogSegments(); - } - } - - void scheduleReadNext() { - orderedSubmit(new CloseableRunnable() { - @Override - void safeRun() { - if (null == currentSegmentReader) { - pauseReadAheadOnNoMoreLogSegments(); - return; - } - unsafeReadNext(currentSegmentReader); - } - }); - } - - private void unsafeReadNext(SegmentReader reader) { - reader.readNext().addEventListener(this); - } - - @Override - public void onSegmentsUpdated(List<LogSegmentMetadata> segments) { - if (!started.get()) { - return; - } - logger.info("segments is updated with {}", segments); - processLogSegments(segments); - } - - @Override - public void onLogStreamDeleted() { - setLastException(new LogNotFoundException("Log stream " - + streamName + " is deleted")); - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadUtils.java deleted file mode 100644 index f481561..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadUtils.java +++ /dev/null @@ -1,782 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -import com.twitter.distributedlog.logsegment.LogSegmentEntryStore; -import com.twitter.distributedlog.logsegment.LogSegmentRandomAccessEntryReader; -import com.google.common.base.Optional; -import com.google.common.collect.Lists; -import com.twitter.distributedlog.selector.FirstDLSNNotLessThanSelector; -import com.twitter.distributedlog.selector.FirstTxIdNotLessThanSelector; -import com.twitter.distributedlog.selector.LastRecordSelector; -import com.twitter.distributedlog.selector.LogRecordSelector; -import com.twitter.distributedlog.util.FutureUtils.FutureEventListenerRunnable; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction0; -import scala.runtime.BoxedUnit; - -/** - * Utility function for readers - */ -public class ReadUtils { - - static final Logger LOG = LoggerFactory.getLogger(ReadUtils.class); - - private static final int MIN_SEARCH_BATCH_SIZE = 2; - - // - // Read First & Last Record Functions - // - - /** - * Read last record from a log segment. - * - * @param streamName - * fully qualified stream name (used for logging) - * @param l - * log segment metadata. - * @param fence - * whether to fence the log segment. - * @param includeControl - * whether to include control record. - * @param includeEndOfStream - * whether to include end of stream. - * @param scanStartBatchSize - * first num entries used for read last record scan - * @param scanMaxBatchSize - * max num entries used for read last record scan - * @param numRecordsScanned - * num of records scanned to get last record - * @param executorService - * executor service used for processing entries - * @param entryStore - * log segment entry store - * @return a future with last record. - */ - public static Future<LogRecordWithDLSN> asyncReadLastRecord( - final String streamName, - final LogSegmentMetadata l, - final boolean fence, - final boolean includeControl, - final boolean includeEndOfStream, - final int scanStartBatchSize, - final int scanMaxBatchSize, - final AtomicInteger numRecordsScanned, - final ExecutorService executorService, - final LogSegmentEntryStore entryStore) { - final LogRecordSelector selector = new LastRecordSelector(); - return asyncReadRecord(streamName, l, fence, includeControl, includeEndOfStream, scanStartBatchSize, - scanMaxBatchSize, numRecordsScanned, executorService, entryStore, - selector, true /* backward */, 0L); - } - - /** - * Read first record from a log segment with a DLSN larger than that given. - * - * @param streamName - * fully qualified stream name (used for logging) - * @param l - * log segment metadata. - * @param scanStartBatchSize - * first num entries used for read last record scan - * @param scanMaxBatchSize - * max num entries used for read last record scan - * @param numRecordsScanned - * num of records scanned to get last record - * @param executorService - * executor service used for processing entries - * @param entryStore - * log segment entry store - * @param dlsn - * threshold dlsn - * @return a future with last record. - */ - public static Future<LogRecordWithDLSN> asyncReadFirstUserRecord( - final String streamName, - final LogSegmentMetadata l, - final int scanStartBatchSize, - final int scanMaxBatchSize, - final AtomicInteger numRecordsScanned, - final ExecutorService executorService, - final LogSegmentEntryStore entryStore, - final DLSN dlsn) { - long startEntryId = 0L; - if (l.getLogSegmentSequenceNumber() == dlsn.getLogSegmentSequenceNo()) { - startEntryId = dlsn.getEntryId(); - } - final LogRecordSelector selector = new FirstDLSNNotLessThanSelector(dlsn); - return asyncReadRecord(streamName, l, false, false, false, scanStartBatchSize, - scanMaxBatchSize, numRecordsScanned, executorService, entryStore, - selector, false /* backward */, startEntryId); - } - - // - // Private methods for scanning log segments - // - - private static class ScanContext { - // variables to about current scan state - final AtomicInteger numEntriesToScan; - final AtomicLong curStartEntryId; - final AtomicLong curEndEntryId; - - // scan settings - final long startEntryId; - final long endEntryId; - final int scanStartBatchSize; - final int scanMaxBatchSize; - final boolean includeControl; - final boolean includeEndOfStream; - final boolean backward; - - // number of records scanned - final AtomicInteger numRecordsScanned; - - ScanContext(long startEntryId, long endEntryId, - int scanStartBatchSize, - int scanMaxBatchSize, - boolean includeControl, - boolean includeEndOfStream, - boolean backward, - AtomicInteger numRecordsScanned) { - this.startEntryId = startEntryId; - this.endEntryId = endEntryId; - this.scanStartBatchSize = scanStartBatchSize; - this.scanMaxBatchSize = scanMaxBatchSize; - this.includeControl = includeControl; - this.includeEndOfStream = includeEndOfStream; - this.backward = backward; - // Scan state - this.numEntriesToScan = new AtomicInteger(scanStartBatchSize); - if (backward) { - this.curStartEntryId = new AtomicLong( - Math.max(startEntryId, (endEntryId - scanStartBatchSize + 1))); - this.curEndEntryId = new AtomicLong(endEntryId); - } else { - this.curStartEntryId = new AtomicLong(startEntryId); - this.curEndEntryId = new AtomicLong( - Math.min(endEntryId, (startEntryId + scanStartBatchSize - 1))); - } - this.numRecordsScanned = numRecordsScanned; - } - - boolean moveToNextRange() { - if (backward) { - return moveBackward(); - } else { - return moveForward(); - } - } - - boolean moveBackward() { - long nextEndEntryId = curStartEntryId.get() - 1; - if (nextEndEntryId < startEntryId) { - // no entries to read again - return false; - } - curEndEntryId.set(nextEndEntryId); - // update num entries to scan - numEntriesToScan.set( - Math.min(numEntriesToScan.get() * 2, scanMaxBatchSize)); - // update start entry id - curStartEntryId.set(Math.max(startEntryId, nextEndEntryId - numEntriesToScan.get() + 1)); - return true; - } - - boolean moveForward() { - long nextStartEntryId = curEndEntryId.get() + 1; - if (nextStartEntryId > endEntryId) { - // no entries to read again - return false; - } - curStartEntryId.set(nextStartEntryId); - // update num entries to scan - numEntriesToScan.set( - Math.min(numEntriesToScan.get() * 2, scanMaxBatchSize)); - // update start entry id - curEndEntryId.set(Math.min(endEntryId, nextStartEntryId + numEntriesToScan.get() - 1)); - return true; - } - } - - private static class SingleEntryScanContext extends ScanContext { - SingleEntryScanContext(long entryId) { - super(entryId, entryId, 1, 1, true, true, false, new AtomicInteger(0)); - } - } - - /** - * Read record from a given range of log segment entries. - * - * @param streamName - * fully qualified stream name (used for logging) - * @param reader - * log segment random access reader - * @param executorService - * executor service used for processing entries - * @param context - * scan context - * @return a future with the log record. - */ - private static Future<LogRecordWithDLSN> asyncReadRecordFromEntries( - final String streamName, - final LogSegmentRandomAccessEntryReader reader, - final LogSegmentMetadata metadata, - final ExecutorService executorService, - final ScanContext context, - final LogRecordSelector selector) { - final Promise<LogRecordWithDLSN> promise = new Promise<LogRecordWithDLSN>(); - final long startEntryId = context.curStartEntryId.get(); - final long endEntryId = context.curEndEntryId.get(); - if (LOG.isDebugEnabled()) { - LOG.debug("{} reading entries [{} - {}] from {}.", - new Object[] { streamName, startEntryId, endEntryId, metadata}); - } - FutureEventListener<List<Entry.Reader>> readEntriesListener = - new FutureEventListener<List<Entry.Reader>>() { - @Override - public void onSuccess(final List<Entry.Reader> entries) { - if (LOG.isDebugEnabled()) { - LOG.debug("{} finished reading entries [{} - {}] from {}", - new Object[]{ streamName, startEntryId, endEntryId, metadata}); - } - for (Entry.Reader entry : entries) { - try { - visitEntryRecords(entry, context, selector); - } catch (IOException ioe) { - // exception is only thrown due to bad ledger entry, so it might be corrupted - // we shouldn't do anything beyond this point. throw the exception to application - promise.setException(ioe); - return; - } - } - - LogRecordWithDLSN record = selector.result(); - if (LOG.isDebugEnabled()) { - LOG.debug("{} got record from entries [{} - {}] of {} : {}", - new Object[]{streamName, startEntryId, endEntryId, - metadata, record}); - } - promise.setValue(record); - } - - @Override - public void onFailure(final Throwable cause) { - promise.setException(cause); - } - }; - reader.readEntries(startEntryId, endEntryId) - .addEventListener(FutureEventListenerRunnable.of(readEntriesListener, executorService)); - return promise; - } - - /** - * Process each record using LogRecordSelector. - * - * @param entry - * ledger entry - * @param context - * scan context - * @return log record with dlsn inside the ledger entry - * @throws IOException - */ - private static void visitEntryRecords( - Entry.Reader entry, - ScanContext context, - LogRecordSelector selector) throws IOException { - LogRecordWithDLSN nextRecord = entry.nextRecord(); - while (nextRecord != null) { - LogRecordWithDLSN record = nextRecord; - nextRecord = entry.nextRecord(); - context.numRecordsScanned.incrementAndGet(); - if (!context.includeControl && record.isControl()) { - continue; - } - if (!context.includeEndOfStream && record.isEndOfStream()) { - continue; - } - selector.process(record); - } - } - - /** - * Scan entries for the given record. - * - * @param streamName - * fully qualified stream name (used for logging) - * @param reader - * log segment random access reader - * @param executorService - * executor service used for processing entries - * @param promise - * promise to return desired record. - * @param context - * scan context - */ - private static void asyncReadRecordFromEntries( - final String streamName, - final LogSegmentRandomAccessEntryReader reader, - final LogSegmentMetadata metadata, - final ExecutorService executorService, - final Promise<LogRecordWithDLSN> promise, - final ScanContext context, - final LogRecordSelector selector) { - FutureEventListener<LogRecordWithDLSN> readEntriesListener = - new FutureEventListener<LogRecordWithDLSN>() { - @Override - public void onSuccess(LogRecordWithDLSN value) { - if (LOG.isDebugEnabled()) { - LOG.debug("{} read record from [{} - {}] of {} : {}", - new Object[]{streamName, context.curStartEntryId.get(), context.curEndEntryId.get(), - metadata, value}); - } - if (null != value) { - promise.setValue(value); - return; - } - if (!context.moveToNextRange()) { - // no entries to read again - promise.setValue(null); - return; - } - // scan next range - asyncReadRecordFromEntries(streamName, - reader, - metadata, - executorService, - promise, - context, - selector); - } - - @Override - public void onFailure(Throwable cause) { - promise.setException(cause); - } - }; - asyncReadRecordFromEntries(streamName, reader, metadata, executorService, context, selector) - .addEventListener(FutureEventListenerRunnable.of(readEntriesListener, executorService)); - } - - private static void asyncReadRecordFromLogSegment( - final String streamName, - final LogSegmentRandomAccessEntryReader reader, - final LogSegmentMetadata metadata, - final ExecutorService executorService, - final int scanStartBatchSize, - final int scanMaxBatchSize, - final boolean includeControl, - final boolean includeEndOfStream, - final Promise<LogRecordWithDLSN> promise, - final AtomicInteger numRecordsScanned, - final LogRecordSelector selector, - final boolean backward, - final long startEntryId) { - final long lastAddConfirmed = reader.getLastAddConfirmed(); - if (lastAddConfirmed < 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("Log segment {} is empty for {}.", new Object[] { metadata, streamName }); - } - promise.setValue(null); - return; - } - final ScanContext context = new ScanContext( - startEntryId, lastAddConfirmed, - scanStartBatchSize, scanMaxBatchSize, - includeControl, includeEndOfStream, backward, numRecordsScanned); - asyncReadRecordFromEntries(streamName, reader, metadata, executorService, - promise, context, selector); - } - - private static Future<LogRecordWithDLSN> asyncReadRecord( - final String streamName, - final LogSegmentMetadata l, - final boolean fence, - final boolean includeControl, - final boolean includeEndOfStream, - final int scanStartBatchSize, - final int scanMaxBatchSize, - final AtomicInteger numRecordsScanned, - final ExecutorService executorService, - final LogSegmentEntryStore entryStore, - final LogRecordSelector selector, - final boolean backward, - final long startEntryId) { - - final Promise<LogRecordWithDLSN> promise = new Promise<LogRecordWithDLSN>(); - - FutureEventListener<LogSegmentRandomAccessEntryReader> openReaderListener = - new FutureEventListener<LogSegmentRandomAccessEntryReader>() { - @Override - public void onSuccess(final LogSegmentRandomAccessEntryReader reader) { - if (LOG.isDebugEnabled()) { - LOG.debug("{} Opened log segment {} for reading record", - streamName, l); - } - promise.ensure(new AbstractFunction0<BoxedUnit>() { - @Override - public BoxedUnit apply() { - reader.asyncClose(); - return BoxedUnit.UNIT; - } - }); - if (LOG.isDebugEnabled()) { - LOG.debug("{} {} scanning {}.", new Object[]{ - (backward ? "backward" : "forward"), streamName, l}); - } - asyncReadRecordFromLogSegment( - streamName, reader, l, executorService, - scanStartBatchSize, scanMaxBatchSize, - includeControl, includeEndOfStream, - promise, numRecordsScanned, selector, backward, startEntryId); - } - - @Override - public void onFailure(final Throwable cause) { - promise.setException(cause); - } - }; - entryStore.openRandomAccessReader(l, fence) - .addEventListener(FutureEventListenerRunnable.of(openReaderListener, executorService)); - return promise; - } - - // - // Search Functions - // - - /** - * Get the log record whose transaction id is not less than provided <code>transactionId</code>. - * - * <p> - * It uses a binary-search like algorithm to find the log record whose transaction id is not less than - * provided <code>transactionId</code> within a log <code>segment</code>. You could think of a log segment - * in terms of a sequence of records whose transaction ids are non-decreasing. - * - * - The sequence of records within a log segment is divided into N pieces. - * - Find the piece of records that contains a record whose transaction id is not less than provided - * <code>transactionId</code>. - * - * N could be chosen based on trading off concurrency and latency. - * </p> - * - * @param logName - * name of the log - * @param segment - * metadata of the log segment - * @param transactionId - * transaction id - * @param executorService - * executor service used for processing entries - * @param entryStore - * log segment entry store - * @param nWays - * how many number of entries to search in parallel - * @return found log record. none if all transaction ids are less than provided <code>transactionId</code>. - */ - public static Future<Optional<LogRecordWithDLSN>> getLogRecordNotLessThanTxId( - final String logName, - final LogSegmentMetadata segment, - final long transactionId, - final ExecutorService executorService, - final LogSegmentEntryStore entryStore, - final int nWays) { - if (!segment.isInProgress()) { - if (segment.getLastTxId() < transactionId) { - // all log records whose transaction id is less than provided transactionId - // then return none - Optional<LogRecordWithDLSN> noneRecord = Optional.absent(); - return Future.value(noneRecord); - } - } - - final Promise<Optional<LogRecordWithDLSN>> promise = - new Promise<Optional<LogRecordWithDLSN>>(); - final FutureEventListener<LogSegmentRandomAccessEntryReader> openReaderListener = - new FutureEventListener<LogSegmentRandomAccessEntryReader>() { - @Override - public void onSuccess(final LogSegmentRandomAccessEntryReader reader) { - promise.ensure(new AbstractFunction0<BoxedUnit>() { - @Override - public BoxedUnit apply() { - reader.asyncClose(); - return BoxedUnit.UNIT; - } - - }); - long lastEntryId = reader.getLastAddConfirmed(); - if (lastEntryId < 0) { - // it means that the log segment is created but not written yet or an empty log segment. - // it is equivalent to 'all log records whose transaction id is less than provided transactionId' - Optional<LogRecordWithDLSN> nonRecord = Optional.absent(); - promise.setValue(nonRecord); - return; - } - // all log records whose transaction id is not less than provided transactionId - if (segment.getFirstTxId() >= transactionId) { - final FirstTxIdNotLessThanSelector selector = - new FirstTxIdNotLessThanSelector(transactionId); - asyncReadRecordFromEntries( - logName, - reader, - segment, - executorService, - new SingleEntryScanContext(0L), - selector - ).addEventListener(new FutureEventListener<LogRecordWithDLSN>() { - @Override - public void onSuccess(LogRecordWithDLSN value) { - promise.setValue(Optional.of(selector.result())); - } - - @Override - public void onFailure(Throwable cause) { - promise.setException(cause); - } - }); - - return; - } - getLogRecordNotLessThanTxIdFromEntries( - logName, - segment, - transactionId, - executorService, - reader, - Lists.newArrayList(0L, lastEntryId), - nWays, - Optional.<LogRecordWithDLSN>absent(), - promise); - } - - @Override - public void onFailure(final Throwable cause) { - promise.setException(cause); - } - }; - - entryStore.openRandomAccessReader(segment, false) - .addEventListener(FutureEventListenerRunnable.of(openReaderListener, executorService)); - return promise; - } - - /** - * Find the log record whose transaction id is not less than provided <code>transactionId</code> from - * entries between <code>startEntryId</code> and <code>endEntryId</code>. - * - * @param logName - * name of the log - * @param segment - * log segment - * @param transactionId - * provided transaction id to search - * @param executorService - * executor service - * @param reader - * log segment random access reader - * @param entriesToSearch - * list of entries to search - * @param nWays - * how many entries to search in parallel - * @param prevFoundRecord - * the log record found in previous search - * @param promise - * promise to satisfy the result - */ - private static void getLogRecordNotLessThanTxIdFromEntries( - final String logName, - final LogSegmentMetadata segment, - final long transactionId, - final ExecutorService executorService, - final LogSegmentRandomAccessEntryReader reader, - final List<Long> entriesToSearch, - final int nWays, - final Optional<LogRecordWithDLSN> prevFoundRecord, - final Promise<Optional<LogRecordWithDLSN>> promise) { - final List<Future<LogRecordWithDLSN>> searchResults = - Lists.newArrayListWithExpectedSize(entriesToSearch.size()); - for (Long entryId : entriesToSearch) { - LogRecordSelector selector = new FirstTxIdNotLessThanSelector(transactionId); - Future<LogRecordWithDLSN> searchResult = asyncReadRecordFromEntries( - logName, - reader, - segment, - executorService, - new SingleEntryScanContext(entryId), - selector); - searchResults.add(searchResult); - } - FutureEventListener<List<LogRecordWithDLSN>> processSearchResultsListener = - new FutureEventListener<List<LogRecordWithDLSN>>() { - @Override - public void onSuccess(List<LogRecordWithDLSN> resultList) { - processSearchResults( - logName, - segment, - transactionId, - executorService, - reader, - resultList, - nWays, - prevFoundRecord, - promise); - } - - @Override - public void onFailure(Throwable cause) { - promise.setException(cause); - } - }; - Future.collect(searchResults).addEventListener( - FutureEventListenerRunnable.of(processSearchResultsListener, executorService)); - } - - /** - * Process the search results - */ - static void processSearchResults( - final String logName, - final LogSegmentMetadata segment, - final long transactionId, - final ExecutorService executorService, - final LogSegmentRandomAccessEntryReader reader, - final List<LogRecordWithDLSN> searchResults, - final int nWays, - final Optional<LogRecordWithDLSN> prevFoundRecord, - final Promise<Optional<LogRecordWithDLSN>> promise) { - int found = -1; - for (int i = 0; i < searchResults.size(); i++) { - LogRecordWithDLSN record = searchResults.get(i); - if (record.getTransactionId() >= transactionId) { - found = i; - break; - } - } - if (found == -1) { // all log records' transaction id is less than provided transaction id - promise.setValue(prevFoundRecord); - return; - } - // we found a log record - LogRecordWithDLSN foundRecord = searchResults.get(found); - - // we found it - // - it is not the first record - // - it is the first record in first search entry - // - its entry is adjacent to previous search entry - if (foundRecord.getDlsn().getSlotId() != 0L - || found == 0 - || foundRecord.getDlsn().getEntryId() == (searchResults.get(found - 1).getDlsn().getEntryId() + 1)) { - promise.setValue(Optional.of(foundRecord)); - return; - } - - // otherwise, we need to search - List<Long> nextSearchBatch = getEntriesToSearch( - transactionId, - searchResults.get(found - 1), - searchResults.get(found), - nWays); - if (nextSearchBatch.isEmpty()) { - promise.setValue(prevFoundRecord); - return; - } - getLogRecordNotLessThanTxIdFromEntries( - logName, - segment, - transactionId, - executorService, - reader, - nextSearchBatch, - nWays, - Optional.of(foundRecord), - promise); - } - - /** - * Get the entries to search provided <code>transactionId</code> between - * <code>firstRecord</code> and <code>lastRecord</code>. <code>firstRecord</code> - * and <code>lastRecord</code> are already searched, which the transaction id - * of <code>firstRecord</code> is less than <code>transactionId</code> and the - * transaction id of <code>lastRecord</code> is not less than <code>transactionId</code>. - * - * @param transactionId - * transaction id to search - * @param firstRecord - * log record that already searched whose transaction id is leass than <code>transactionId</code>. - * @param lastRecord - * log record that already searched whose transaction id is not less than <code>transactionId</code>. - * @param nWays - * N-ways to search - * @return the list of entries to search - */ - static List<Long> getEntriesToSearch( - long transactionId, - LogRecordWithDLSN firstRecord, - LogRecordWithDLSN lastRecord, - int nWays) { - long txnDiff = lastRecord.getTransactionId() - firstRecord.getTransactionId(); - if (txnDiff > 0) { - if (lastRecord.getTransactionId() == transactionId) { - List<Long> entries = getEntriesToSearch( - firstRecord.getDlsn().getEntryId() + 1, - lastRecord.getDlsn().getEntryId() - 2, - Math.max(MIN_SEARCH_BATCH_SIZE, nWays - 1)); - entries.add(lastRecord.getDlsn().getEntryId() - 1); - return entries; - } else { - // TODO: improve it by estimating transaction ids. - return getEntriesToSearch( - firstRecord.getDlsn().getEntryId() + 1, - lastRecord.getDlsn().getEntryId() - 1, - nWays); - } - } else { - // unexpected condition - return Lists.newArrayList(); - } - } - - static List<Long> getEntriesToSearch( - long startEntryId, - long endEntryId, - int nWays) { - if (startEntryId > endEntryId) { - return Lists.newArrayList(); - } - long numEntries = endEntryId - startEntryId + 1; - long step = Math.max(1L, numEntries / nWays); - List<Long> entryList = Lists.newArrayListWithExpectedSize(nWays); - for (long i = startEntryId, j = nWays - 1; i <= endEntryId && j > 0; i += step, j--) { - entryList.add(i); - } - if (entryList.get(entryList.size() - 1) < endEntryId) { - entryList.add(endEntryId); - } - return entryList; - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/WriteLimiter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/WriteLimiter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/WriteLimiter.java deleted file mode 100644 index 0b24c1a..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/WriteLimiter.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import com.twitter.distributedlog.exceptions.OverCapacityException; -import com.twitter.distributedlog.util.PermitLimiter; - -public class WriteLimiter { - - String streamName; - final PermitLimiter streamLimiter; - final PermitLimiter globalLimiter; - - public WriteLimiter(String streamName, PermitLimiter streamLimiter, PermitLimiter globalLimiter) { - this.streamName = streamName; - this.streamLimiter = streamLimiter; - this.globalLimiter = globalLimiter; - } - - public void acquire() throws OverCapacityException { - if (!streamLimiter.acquire()) { - throw new OverCapacityException(String.format("Stream write capacity exceeded for stream %s", streamName)); - } - try { - if (!globalLimiter.acquire()) { - throw new OverCapacityException("Global write capacity exceeded"); - } - } catch (OverCapacityException ex) { - streamLimiter.release(1); - throw ex; - } - } - - public void release() { - release(1); - } - - public void release(int permits) { - streamLimiter.release(permits); - globalLimiter.release(permits); - } - - public void close() { - streamLimiter.close(); - globalLimiter.close(); - } -}