DL-195: ReadAhead Improvement (part 1) - Interface for LogSegmentEntryReader and LogSegmentEntryWriter
Create interface for log segment entry reader and writer to abstract data write/read operations. Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/27c04f37 Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/27c04f37 Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/27c04f37 Branch: refs/heads/master Commit: 27c04f37b1b564a06d9105acb1849a6d98449ca9 Parents: 98a29a5 Author: Sijie Guo <sij...@twitter.com> Authored: Wed Dec 28 14:51:54 2016 -0800 Committer: Sijie Guo <sij...@twitter.com> Committed: Thu Dec 29 02:11:23 2016 -0800 ---------------------------------------------------------------------- .../distributedlog/BKLogWriteHandler.java | 2 +- .../DistributedLogConfiguration.java | 44 ++ .../impl/BKLogSegmentEntryWriter.java | 61 -- .../logsegment/BKLogSegmentEntryReader.java | 740 +++++++++++++++++++ .../logsegment/BKLogSegmentEntryWriter.java | 61 ++ .../distributedlog/impl/logsegment/BKUtils.java | 72 ++ .../logsegment/LogSegmentEntryReader.java | 67 ++ .../logsegment/LogSegmentEntryStore.java | 46 ++ .../com/twitter/distributedlog/DLMTestUtil.java | 2 +- .../distributedlog/TestBKLogSegmentWriter.java | 2 +- .../distributedlog/TestDistributedLogBase.java | 2 +- .../logsegment/TestBKLogSegmentEntryReader.java | 553 ++++++++++++++ .../exceptions/EndOfLogSegmentException.java | 32 + .../src/main/thrift/service.thrift | 2 + 14 files changed, 1621 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/27c04f37/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java index f8bc917..16c0111 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java @@ -29,7 +29,7 @@ import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException; import com.twitter.distributedlog.exceptions.TransactionIdOutOfOrderException; import com.twitter.distributedlog.exceptions.UnexpectedException; import com.twitter.distributedlog.function.GetLastTxIdFunction; -import com.twitter.distributedlog.impl.BKLogSegmentEntryWriter; +import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryWriter; import com.twitter.distributedlog.metadata.LogMetadataForWriter; import com.twitter.distributedlog.lock.DistributedLock; import com.twitter.distributedlog.logsegment.LogSegmentFilter; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/27c04f37/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java index 6f37a59..6c6017e 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java @@ -352,6 +352,10 @@ public class DistributedLogConfiguration extends CompositeConfiguration { public static final int BKDL_READAHEAD_NOSUCHLEDGER_EXCEPTION_ON_READLAC_ERROR_THRESHOLD_MILLIS_DEFAULT = 10000; public static final String BKDL_READAHEAD_SKIP_BROKEN_ENTRIES = "readAheadSkipBrokenEntries"; public static final boolean BKDL_READAHEAD_SKIP_BROKEN_ENTRIES_DEFAULT = false; + public static final String BKDL_NUM_PREFETCH_ENTRIES_PER_LOGSEGMENT = "numPrefetchEntriesPerLogSegment"; + public static final int BKDL_NUM_PREFETCH_ENTRIES_PER_LOGSEGMENT_DEFAULT = 4; + public static final String BKDL_MAX_PREFETCH_ENTRIES_PER_LOGSEGMENT = "maxPrefetchEntriesPerLogSegment"; + public static final int BKDL_MAX_PREFETCH_ENTRIES_PER_LOGSEGMENT_DEFAULT = 32; // Scan Settings public static final String BKDL_FIRST_NUM_ENTRIES_PER_READ_LAST_RECORD_SCAN = "firstNumEntriesEachPerLastRecordScan"; @@ -2770,6 +2774,46 @@ public class DistributedLogConfiguration extends CompositeConfiguration { return this; } + /** + * Get the number prefetch entries per log segment. Default value is 4. + * + * @return the number prefetch entries per log segment. + */ + public int getNumPrefetchEntriesPerLogSegment() { + return getInt(BKDL_NUM_PREFETCH_ENTRIES_PER_LOGSEGMENT, BKDL_NUM_PREFETCH_ENTRIES_PER_LOGSEGMENT_DEFAULT); + } + + /** + * Set the number prefetch entries per log segment. + * + * @param numEntries the number prefetch entries per log segment. + * @return configuration + */ + public DistributedLogConfiguration setNumPrefetchEntriesPerLogSegment(int numEntries) { + setProperty(BKDL_NUM_PREFETCH_ENTRIES_PER_LOGSEGMENT, numEntries); + return this; + } + + /** + * Get the max prefetch entries per log segment. Default value is 4. + * + * @return the max prefetch entries per log segment. + */ + public int getMaxPrefetchEntriesPerLogSegment() { + return getInt(BKDL_MAX_PREFETCH_ENTRIES_PER_LOGSEGMENT, BKDL_MAX_PREFETCH_ENTRIES_PER_LOGSEGMENT_DEFAULT); + } + + /** + * Set the max prefetch entries per log segment. + * + * @param numEntries the max prefetch entries per log segment. + * @return configuration + */ + public DistributedLogConfiguration setMaxPrefetchEntriesPerLogSegment(int numEntries) { + setProperty(BKDL_MAX_PREFETCH_ENTRIES_PER_LOGSEGMENT, numEntries); + return this; + } + // // DL Reader Scan Settings // http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/27c04f37/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKLogSegmentEntryWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKLogSegmentEntryWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKLogSegmentEntryWriter.java deleted file mode 100644 index 57b4e69..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKLogSegmentEntryWriter.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.impl; - -import com.google.common.annotations.VisibleForTesting; -import com.twitter.distributedlog.logsegment.LogSegmentEntryWriter; -import org.apache.bookkeeper.client.AsyncCallback; -import org.apache.bookkeeper.client.LedgerHandle; - -/** - * Ledger based log segment entry writer. - */ -public class BKLogSegmentEntryWriter implements LogSegmentEntryWriter { - - private final LedgerHandle lh; - - public BKLogSegmentEntryWriter(LedgerHandle lh) { - this.lh = lh; - } - - @VisibleForTesting - public LedgerHandle getLedgerHandle() { - return this.lh; - } - - @Override - public long getLogSegmentId() { - return lh.getId(); - } - - @Override - public void asyncClose(AsyncCallback.CloseCallback callback, Object ctx) { - lh.asyncClose(callback, ctx); - } - - @Override - public void asyncAddEntry(byte[] data, int offset, int length, - AsyncCallback.AddCallback callback, Object ctx) { - lh.asyncAddEntry(data, offset, length, callback, ctx); - } - - @Override - public long size() { - return lh.getLength(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/27c04f37/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java new file mode 100644 index 0000000..fd3b63f --- /dev/null +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java @@ -0,0 +1,740 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.impl.logsegment; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.twitter.distributedlog.DistributedLogConfiguration; +import com.twitter.distributedlog.Entry; +import com.twitter.distributedlog.LogSegmentMetadata; +import com.twitter.distributedlog.exceptions.BKTransmitException; +import com.twitter.distributedlog.exceptions.DLIllegalStateException; +import com.twitter.distributedlog.exceptions.DLInterruptedException; +import com.twitter.distributedlog.exceptions.EndOfLogSegmentException; +import com.twitter.distributedlog.exceptions.ReadCancelledException; +import com.twitter.distributedlog.logsegment.LogSegmentEntryReader; +import com.twitter.distributedlog.util.FutureUtils; +import com.twitter.distributedlog.util.OrderedScheduler; +import com.twitter.util.Future; +import com.twitter.util.Promise; +import org.apache.bookkeeper.client.AsyncCallback; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.LedgerEntry; +import org.apache.bookkeeper.client.LedgerHandle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import static com.google.common.base.Charsets.UTF_8; + +/** + * BookKeeper ledger based log segment entry reader. + */ +public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, AsyncCallback.OpenCallback { + + private static final Logger logger = LoggerFactory.getLogger(BKLogSegmentEntryReader.class); + + private class CacheEntry implements Runnable, AsyncCallback.ReadCallback, + AsyncCallback.ReadLastConfirmedAndEntryCallback { + + protected final long entryId; + private boolean done; + private LedgerEntry entry; + private int rc; + + private CacheEntry(long entryId) { + this.entryId = entryId; + this.entry = null; + this.rc = BKException.Code.UnexpectedConditionException; + this.done = false; + } + + long getEntryId() { + return entryId; + } + + synchronized boolean isDone() { + return done; + } + + void setValue(LedgerEntry entry) { + synchronized (this) { + if (done) { + return; + } + this.rc = BKException.Code.OK; + this.entry = entry; + } + setDone(true); + } + + void setException(int rc) { + synchronized (this) { + if (done) { + return; + } + this.rc = rc; + } + setDone(false); + } + + void setDone(boolean success) { + synchronized (this) { + this.done = true; + } + onReadEntryDone(success); + } + + synchronized boolean isSuccess() { + return BKException.Code.OK == rc; + } + + synchronized LedgerEntry getEntry() { + return this.entry; + } + + synchronized int getRc() { + return rc; + } + + @Override + public void readComplete(int rc, + LedgerHandle lh, + Enumeration<LedgerEntry> entries, + Object ctx) { + if (isDone()) { + return; + } + if (!checkReturnCodeAndHandleFailure(rc, false)) { + return; + } + LedgerEntry entry = null; + while (entries.hasMoreElements()) { + // more entries are returned + if (null != entry) { + setException(BKException.Code.UnexpectedConditionException); + return; + } + entry = entries.nextElement(); + } + if (null == entry || entry.getEntryId() != entryId) { + setException(BKException.Code.UnexpectedConditionException); + return; + } + setValue(entry); + } + + @Override + public void readLastConfirmedAndEntryComplete(int rc, + long entryId, + LedgerEntry entry, + Object ctx) { + if (isDone()) { + return; + } + if (!checkReturnCodeAndHandleFailure(rc, true)) { + return; + } + if (null != entry && this.entryId == entryId) { + setValue(entry); + return; + } + // the long poll is timeout or interrupted; we will retry it again. + issueRead(this); + } + + /** + * Check return code and retry if needed. + * + * @param rc the return code + * @param isLongPoll is it a long poll request + * @return is the request successful or not + */ + boolean checkReturnCodeAndHandleFailure(int rc, boolean isLongPoll) { + if (BKException.Code.OK == rc) { + numReadErrors.set(0); + return true; + } + if (BKException.Code.BookieHandleNotAvailableException == rc || + (isLongPoll && BKException.Code.NoSuchLedgerExistsException == rc)) { + int numErrors = Math.max(1, numReadErrors.incrementAndGet()); + int nextReadBackoffTime = Math.min(numErrors * readAheadWaitTime, maxReadBackoffTime); + scheduler.schedule( + this, + nextReadBackoffTime, + TimeUnit.MILLISECONDS); + } else { + setException(rc); + } + return false; + } + + @Override + public void run() { + issueRead(this); + } + } + + private class PendingReadRequest { + private final int numEntries; + private final List<Entry.Reader> entries; + private final Promise<List<Entry.Reader>> promise; + + PendingReadRequest(int numEntries) { + this.numEntries = numEntries; + if (numEntries == 1) { + this.entries = new ArrayList<Entry.Reader>(1); + } else { + this.entries = new ArrayList<Entry.Reader>(); + } + this.promise = new Promise<List<Entry.Reader>>(); + } + + Promise<List<Entry.Reader>> getPromise() { + return promise; + } + + void setException(Throwable throwable) { + FutureUtils.setException(promise, throwable); + } + + void addEntry(Entry.Reader entry) { + entries.add(entry); + } + + void complete() { + FutureUtils.setValue(promise, entries); + onEntriesConsumed(entries.size()); + } + + boolean hasReadEntries() { + return entries.size() > 0; + } + + boolean hasReadEnoughEntries() { + return entries.size() >= numEntries; + } + } + + private final BookKeeper bk; + private final DistributedLogConfiguration conf; + private final OrderedScheduler scheduler; + private final long startEntryId; + private final long lssn; + private final long startSequenceId; + private final boolean envelopeEntries; + private final int numPrefetchEntries; + private final int maxPrefetchEntries; + // state + private Promise<Void> closePromise = null; + private LogSegmentMetadata metadata; + private LedgerHandle lh; + private final List<LedgerHandle> openLedgerHandles; + private CacheEntry outstandingLongPoll; + private long nextEntryId; + private final AtomicReference<Throwable> lastException = new AtomicReference<Throwable>(null); + private final AtomicLong scheduleCount = new AtomicLong(0); + // read retries + private int readAheadWaitTime; + private final int maxReadBackoffTime; + private final AtomicInteger numReadErrors = new AtomicInteger(0); + // readahead cache + int cachedEntries = 0; + int numOutstandingEntries = 0; + final LinkedBlockingQueue<CacheEntry> readAheadEntries; + // request queue + final ConcurrentLinkedQueue<PendingReadRequest> readQueue; + + BKLogSegmentEntryReader(LogSegmentMetadata metadata, + LedgerHandle lh, + long startEntryId, + BookKeeper bk, + OrderedScheduler scheduler, + DistributedLogConfiguration conf) { + this.metadata = metadata; + this.lssn = metadata.getLogSegmentSequenceNumber(); + this.startSequenceId = metadata.getStartSequenceId(); + this.envelopeEntries = metadata.getEnvelopeEntries(); + this.lh = lh; + this.startEntryId = this.nextEntryId = Math.max(startEntryId, 0); + this.bk = bk; + this.conf = conf; + this.numPrefetchEntries = conf.getNumPrefetchEntriesPerLogSegment(); + this.maxPrefetchEntries = conf.getMaxPrefetchEntriesPerLogSegment(); + this.scheduler = scheduler; + this.openLedgerHandles = Lists.newArrayList(); + this.openLedgerHandles.add(lh); + this.outstandingLongPoll = null; + // create the readahead queue + this.readAheadEntries = new LinkedBlockingQueue<CacheEntry>(); + // create the read request queue + this.readQueue = new ConcurrentLinkedQueue<PendingReadRequest>(); + // read backoff settings + this.readAheadWaitTime = conf.getReadAheadWaitTime(); + this.maxReadBackoffTime = 4 * conf.getReadAheadWaitTime(); + } + + synchronized LedgerHandle getLh() { + return lh; + } + + synchronized LogSegmentMetadata getSegment() { + return metadata; + } + + @VisibleForTesting + synchronized long getNextEntryId() { + return nextEntryId; + } + + @Override + public void start() { + prefetchIfNecessary(); + } + + // + // Process on Log Segment Metadata Updates + // + + @Override + public synchronized void onLogSegmentMetadataUpdated(LogSegmentMetadata segment) { + if (metadata == segment || + LogSegmentMetadata.COMPARATOR.compare(metadata, segment) == 0 || + !(metadata.isInProgress() && !segment.isInProgress())) { + return; + } + // segment is closed from inprogress, then re-open the log segment + bk.asyncOpenLedger( + segment.getLedgerId(), + BookKeeper.DigestType.CRC32, + conf.getBKDigestPW().getBytes(UTF_8), + this, + segment); + } + + @Override + public void openComplete(int rc, LedgerHandle lh, Object ctx) { + LogSegmentMetadata segment = (LogSegmentMetadata) ctx; + if (BKException.Code.OK != rc) { + // fail current reader or retry opening the reader + failOrRetryOpenLedger(rc, segment); + return; + } + // switch to new ledger handle if the log segment is moved to completed. + CacheEntry longPollRead = null; + synchronized (this) { + if (isClosed()) { + lh.asyncClose(new AsyncCallback.CloseCallback() { + @Override + public void closeComplete(int rc, LedgerHandle lh, Object ctx) { + logger.debug("Close the open ledger {} since the log segment reader is already closed", + lh.getId()); + } + }, null); + return; + } + this.metadata = segment; + this.lh = lh; + this.openLedgerHandles.add(lh); + longPollRead = outstandingLongPoll; + } + if (null != longPollRead) { + // reissue the long poll read when the log segment state is changed + issueRead(longPollRead); + } + // notify readers + notifyReaders(); + } + + private void failOrRetryOpenLedger(int rc, final LogSegmentMetadata segment) { + if (isClosed()) { + return; + } + if (isBeyondLastAddConfirmed()) { + // if the reader is already caught up, let's fail the reader immediately + // as we need to pull the latest metadata of this log segment. + setException(new BKTransmitException("Failed to open ledger for reading log segment " + getSegment(), rc), + true); + return; + } + // the reader is still catching up, retry opening the log segment later + scheduler.schedule(new Runnable() { + @Override + public void run() { + onLogSegmentMetadataUpdated(segment); + } + }, conf.getZKRetryBackoffStartMillis(), TimeUnit.MILLISECONDS); + } + + // + // Change the state of this reader + // + + private boolean checkClosedOrInError() { + if (null != lastException.get()) { + cancelAllPendingReads(lastException.get()); + return true; + } + return false; + } + + /** + * Set the reader into error state with return code <i>rc</i>. + * + * @param throwable exception indicating the error + * @param isBackground is the reader set exception by background reads or foreground reads + */ + private void setException(Throwable throwable, boolean isBackground) { + lastException.compareAndSet(null, throwable); + if (isBackground) { + notifyReaders(); + } + } + + /** + * Notify the readers with the state change. + */ + private void notifyReaders() { + processReadRequests(); + } + + private void cancelAllPendingReads(Throwable throwExc) { + for (PendingReadRequest request : readQueue) { + request.setException(throwExc); + } + readQueue.clear(); + } + + // + // Background Read Operations + // + + private void onReadEntryDone(boolean success) { + // we successfully read an entry + synchronized (this) { + --numOutstandingEntries; + } + // notify reader that there is entry ready + notifyReaders(); + // stop prefetch if we already encountered exceptions + if (success) { + prefetchIfNecessary(); + } + } + + private void onEntriesConsumed(int numEntries) { + synchronized (this) { + cachedEntries -= numEntries; + } + prefetchIfNecessary(); + } + + private void prefetchIfNecessary() { + List<CacheEntry> entriesToFetch; + synchronized (this) { + if (cachedEntries >= maxPrefetchEntries) { + return; + } + // we don't have enough entries, do prefetch + int numEntriesToFetch = numPrefetchEntries - numOutstandingEntries; + if (numEntriesToFetch <= 0) { + return; + } + entriesToFetch = new ArrayList<CacheEntry>(numEntriesToFetch); + for (int i = 0; i < numEntriesToFetch; i++) { + if (cachedEntries >= maxPrefetchEntries) { + break; + } + if ((isLedgerClosed() && nextEntryId > getLastAddConfirmed()) || + (!isLedgerClosed() && nextEntryId > getLastAddConfirmed() + 1)) { + break; + } + entriesToFetch.add(new CacheEntry(nextEntryId)); + ++numOutstandingEntries; + ++cachedEntries; + ++nextEntryId; + } + } + for (CacheEntry entry : entriesToFetch) { + readAheadEntries.add(entry); + issueRead(entry); + } + } + + + private void issueRead(CacheEntry cacheEntry) { + if (isClosed()) { + return; + } + if (isLedgerClosed()) { + if (isNotBeyondLastAddConfirmed(cacheEntry.getEntryId())) { + issueSimpleRead(cacheEntry); + return; + } else { + // Reach the end of stream + notifyReaders(); + } + } else { // the ledger is still in progress + if (isNotBeyondLastAddConfirmed(cacheEntry.getEntryId())) { + issueSimpleRead(cacheEntry); + } else { + issueLongPollRead(cacheEntry); + } + } + } + + private void issueSimpleRead(CacheEntry cacheEntry) { + getLh().asyncReadEntries(cacheEntry.entryId, cacheEntry.entryId, cacheEntry, null); + } + + private void issueLongPollRead(CacheEntry cacheEntry) { + // register the read as outstanding reads + synchronized (this) { + this.outstandingLongPoll = cacheEntry; + } + getLh().asyncReadLastConfirmedAndEntry( + cacheEntry.entryId, + conf.getReadLACLongPollTimeout(), + false, + cacheEntry, + null); + } + + // + // Foreground Read Operations + // + + Entry.Reader processReadEntry(LedgerEntry entry) throws IOException { + return Entry.newBuilder() + .setLogSegmentInfo(lssn, startSequenceId) + .setEntryId(entry.getEntryId()) + .setEnvelopeEntry(envelopeEntries) + .deserializeRecordSet(false) + .setInputStream(entry.getEntryInputStream()) + .buildReader(); + } + + @Override + public Future<List<Entry.Reader>> readNext(int numEntries) { + final PendingReadRequest readRequest = new PendingReadRequest(numEntries); + + if (checkClosedOrInError()) { + readRequest.setException(lastException.get()); + } else { + boolean wasQueueEmpty; + synchronized (readQueue) { + wasQueueEmpty = readQueue.isEmpty(); + readQueue.add(readRequest); + } + if (wasQueueEmpty) { + processReadRequests(); + } + } + return readRequest.getPromise(); + } + + private void processReadRequests() { + if (isClosed()) { + // the reader is already closed. + return; + } + + long prevCount = scheduleCount.getAndIncrement(); + if (0 == prevCount) { + scheduler.submit(this); + } + } + + /** + * The core function to propagate fetched entries to read requests + */ + @Override + public void run() { + long scheduleCountLocal = scheduleCount.get(); + while (true) { + PendingReadRequest nextRequest = null; + synchronized (readQueue) { + nextRequest = readQueue.peek(); + } + + // if read queue is empty, nothing to read, return + if (null == nextRequest) { + scheduleCount.set(0L); + return; + } + + // if the oldest pending promise is interrupted then we must + // mark the reader in error and abort all pending reads since + // we don't know the last consumed read + if (null == lastException.get()) { + if (nextRequest.getPromise().isInterrupted().isDefined()) { + setException(new DLInterruptedException("Interrupted on reading log segment " + + getSegment() + " : " + nextRequest.getPromise().isInterrupted().get()), false); + } + } + + // if the reader is in error state, stop read + if (checkClosedOrInError()) { + return; + } + + // read entries from readahead cache to satisfy next read request + readEntriesFromReadAheadCache(nextRequest); + + // check if we can satisfy the read request + if (nextRequest.hasReadEntries()) { + PendingReadRequest request; + synchronized (readQueue) { + request = readQueue.poll(); + } + if (null != request && nextRequest == request) { + request.complete(); + } else { + DLIllegalStateException ise = new DLIllegalStateException("Unexpected condition at reading from " + + getSegment()); + nextRequest.setException(ise); + if (null != request) { + request.setException(ise); + } + setException(ise, false); + } + } else { + if (0 == scheduleCountLocal) { + return; + } + scheduleCountLocal = scheduleCount.decrementAndGet(); + } + } + } + + private void readEntriesFromReadAheadCache(PendingReadRequest nextRequest) { + while (!nextRequest.hasReadEnoughEntries()) { + CacheEntry entry = readAheadEntries.peek(); + // no entry available in the read ahead cache + if (null == entry) { + if (isEndOfLogSegment()) { + setException(new EndOfLogSegmentException(getSegment().getZNodeName()), false); + } + return; + } + // entry is not complete yet. + if (!entry.isDone()) { + // we already reached end of the log segment + if (isEndOfLogSegment(entry.getEntryId())) { + setException(new EndOfLogSegmentException(getSegment().getZNodeName()), false); + } + return; + } + if (entry.isSuccess()) { + CacheEntry removedEntry = readAheadEntries.poll(); + if (entry != removedEntry) { + DLIllegalStateException ise = new DLIllegalStateException("Unexpected condition at reading from " + + getSegment()); + setException(ise, false); + return; + } + try { + nextRequest.addEntry(processReadEntry(entry.getEntry())); + } catch (IOException e) { + setException(e, false); + return; + } + } else { + setException(new BKTransmitException("Encountered issue on reading entry " + entry.getEntryId() + + " @ log segment " + getSegment(), entry.getRc()), false); + return; + } + } + } + + // + // State Management + // + + private synchronized boolean isEndOfLogSegment() { + return isEndOfLogSegment(nextEntryId); + } + + private boolean isEndOfLogSegment(long entryId) { + return isLedgerClosed() && entryId > getLastAddConfirmed(); + } + + private synchronized boolean isBeyondLastAddConfirmed() { + return isBeyondLastAddConfirmed(nextEntryId); + } + + private boolean isBeyondLastAddConfirmed(long entryId) { + return entryId > getLastAddConfirmed(); + } + + private synchronized boolean isNotBeyondLastAddConfirmed() { + return isNotBeyondLastAddConfirmed(nextEntryId); + } + + private boolean isNotBeyondLastAddConfirmed(long entryId) { + return entryId <= getLastAddConfirmed(); + } + + private boolean isLedgerClosed() { + return getLh().isClosed(); + } + + @Override + public long getLastAddConfirmed() { + return getLh().getLastAddConfirmed(); + } + + synchronized boolean isClosed() { + return null != closePromise; + } + + @Override + public Future<Void> asyncClose() { + final Promise<Void> closeFuture; + ReadCancelledException exception; + LedgerHandle[] lhsToClose; + synchronized (this) { + if (null != closePromise) { + return closePromise; + } + closeFuture = closePromise = new Promise<Void>(); + lhsToClose = openLedgerHandles.toArray(new LedgerHandle[openLedgerHandles.size()]); + // set the exception to cancel pending and subsequent reads + exception = new ReadCancelledException(getSegment().getZNodeName(), "Reader was closed"); + setException(exception, false); + } + + // cancel all pending reads + cancelAllPendingReads(exception); + + // close all the open ledger + BKUtils.closeLedgers(lhsToClose).proxyTo(closeFuture); + return closeFuture; + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/27c04f37/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryWriter.java new file mode 100644 index 0000000..34fe1c3 --- /dev/null +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryWriter.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.impl.logsegment; + +import com.google.common.annotations.VisibleForTesting; +import com.twitter.distributedlog.logsegment.LogSegmentEntryWriter; +import org.apache.bookkeeper.client.AsyncCallback; +import org.apache.bookkeeper.client.LedgerHandle; + +/** + * Ledger based log segment entry writer. + */ +public class BKLogSegmentEntryWriter implements LogSegmentEntryWriter { + + private final LedgerHandle lh; + + public BKLogSegmentEntryWriter(LedgerHandle lh) { + this.lh = lh; + } + + @VisibleForTesting + public LedgerHandle getLedgerHandle() { + return this.lh; + } + + @Override + public long getLogSegmentId() { + return lh.getId(); + } + + @Override + public void asyncClose(AsyncCallback.CloseCallback callback, Object ctx) { + lh.asyncClose(callback, ctx); + } + + @Override + public void asyncAddEntry(byte[] data, int offset, int length, + AsyncCallback.AddCallback callback, Object ctx) { + lh.asyncAddEntry(data, offset, length, callback, ctx); + } + + @Override + public long size() { + return lh.getLength(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/27c04f37/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKUtils.java new file mode 100644 index 0000000..c71c67e --- /dev/null +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKUtils.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.impl.logsegment; + +import com.google.common.collect.Lists; +import com.twitter.distributedlog.function.VoidFunctions; +import com.twitter.distributedlog.util.FutureUtils; +import com.twitter.util.Future; +import com.twitter.util.Futures; +import com.twitter.util.Promise; +import org.apache.bookkeeper.client.AsyncCallback; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.LedgerHandle; + +import java.util.List; + +/** + * BookKeeper Util Functions + */ +public class BKUtils { + + /** + * Close a ledger <i>lh</i>. + * + * @param lh ledger handle + * @return future represents close result. + */ + public static Future<Void> closeLedger(LedgerHandle lh) { + final Promise<Void> closePromise = new Promise<Void>(); + lh.asyncClose(new AsyncCallback.CloseCallback() { + @Override + public void closeComplete(int rc, LedgerHandle lh, Object ctx) { + if (BKException.Code.OK != rc) { + FutureUtils.setException(closePromise, BKException.create(rc)); + } else { + FutureUtils.setValue(closePromise, null); + } + } + }, null); + return closePromise; + } + + /** + * Close a list of ledgers <i>lhs</i>. + * + * @param lhs a list of ledgers + * @return future represents close results. + */ + public static Future<Void> closeLedgers(LedgerHandle ... lhs) { + List<Future<Void>> closeResults = Lists.newArrayListWithExpectedSize(lhs.length); + for (LedgerHandle lh : lhs) { + closeResults.add(closeLedger(lh)); + } + return Futures.collect(closeResults).map(VoidFunctions.LIST_TO_VOID_FUNC); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/27c04f37/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java new file mode 100644 index 0000000..d43f3d8 --- /dev/null +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.logsegment; + +import com.google.common.annotations.Beta; +import com.twitter.distributedlog.Entry; +import com.twitter.distributedlog.LogSegmentMetadata; +import com.twitter.distributedlog.io.AsyncCloseable; +import com.twitter.util.Future; + +import java.util.List; + +/** + * An interface class to read the enveloped entry (serialized bytes of + * {@link com.twitter.distributedlog.Entry}) from a log segment + */ +@Beta +public interface LogSegmentEntryReader extends AsyncCloseable { + + /** + * Start the reader. The method to signal the implementation + * to start preparing the data for consumption {@link #readNext(int)} + */ + void start(); + + /** + * Update the log segment each time when the metadata has changed. + * + * @param segment new metadata of the log segment. + */ + void onLogSegmentMetadataUpdated(LogSegmentMetadata segment); + + /** + * Read next <i>numEntries</i> entries from current log segment. + * <p> + * <i>numEntries</i> will be best-effort. + * + * @param numEntries num entries to read from current log segment + * @return A promise that when satisified will contain a non-empty list of entries with their content. + * @throws {@link com.twitter.distributedlog.exceptions.EndOfLogSegmentException} when + * read entries beyond the end of a <i>closed</i> log segment. + */ + Future<List<Entry.Reader>> readNext(int numEntries); + + /** + * Return the last add confirmed entry id (LAC). + * + * @return the last add confirmed entry id. + */ + long getLastAddConfirmed(); + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/27c04f37/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java new file mode 100644 index 0000000..ff47691 --- /dev/null +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.logsegment; + +import com.google.common.annotations.Beta; +import com.twitter.distributedlog.LogSegmentMetadata; +import com.twitter.util.Future; + +/** + * Log Segment Store to read log segments + */ +@Beta +public interface LogSegmentEntryStore { + + /** + * Open the writer for writing data to the log <i>segment</i>. + * + * @param segment the log <i>segment</i> to write data to + * @return future represent the opened writer + */ + Future<LogSegmentEntryWriter> openWriter(LogSegmentMetadata segment); + + /** + * Open the reader for reading data to the log <i>segment</i>. + * + * @param segment the log <i>segment</i> to read data from + * @return future represent the opened reader + */ + Future<LogSegmentEntryReader> openReader(LogSegmentMetadata segment); + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/27c04f37/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java index 1485ae6..588c366 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java @@ -17,7 +17,7 @@ */ package com.twitter.distributedlog; -import com.twitter.distributedlog.impl.BKLogSegmentEntryWriter; +import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryWriter; import com.twitter.distributedlog.logsegment.LogSegmentFilter; import com.twitter.distributedlog.metadata.BKDLConfig; import com.twitter.distributedlog.metadata.DLMetadata; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/27c04f37/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogSegmentWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogSegmentWriter.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogSegmentWriter.java index 7e497c4..b350255 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogSegmentWriter.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogSegmentWriter.java @@ -21,7 +21,7 @@ import com.twitter.distributedlog.exceptions.BKTransmitException; import com.twitter.distributedlog.exceptions.EndOfStreamException; import com.twitter.distributedlog.exceptions.WriteCancelledException; import com.twitter.distributedlog.exceptions.WriteException; -import com.twitter.distributedlog.impl.BKLogSegmentEntryWriter; +import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryWriter; import com.twitter.distributedlog.io.Abortables; import com.twitter.distributedlog.lock.SessionLockFactory; import com.twitter.distributedlog.lock.ZKDistributedLock; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/27c04f37/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java index 8a734b5..a388b68 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java @@ -19,7 +19,7 @@ package com.twitter.distributedlog; import static org.junit.Assert.assertTrue; -import com.twitter.distributedlog.impl.BKLogSegmentEntryWriter; +import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryWriter; import com.twitter.distributedlog.logsegment.LogSegmentEntryWriter; import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore; import com.twitter.distributedlog.namespace.DistributedLogNamespace; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/27c04f37/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java new file mode 100644 index 0000000..9a194c4 --- /dev/null +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java @@ -0,0 +1,553 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.impl.logsegment; + +import com.google.common.collect.Lists; +import com.twitter.distributedlog.AsyncLogWriter; +import com.twitter.distributedlog.BookKeeperClient; +import com.twitter.distributedlog.BookKeeperClientBuilder; +import com.twitter.distributedlog.DLMTestUtil; +import com.twitter.distributedlog.DLSN; +import com.twitter.distributedlog.DistributedLogConfiguration; +import com.twitter.distributedlog.DistributedLogManager; +import com.twitter.distributedlog.Entry; +import com.twitter.distributedlog.LogRecord; +import com.twitter.distributedlog.LogRecordWithDLSN; +import com.twitter.distributedlog.LogSegmentMetadata; +import com.twitter.distributedlog.TestDistributedLogBase; +import com.twitter.distributedlog.exceptions.EndOfLogSegmentException; +import com.twitter.distributedlog.exceptions.ReadCancelledException; +import com.twitter.distributedlog.util.FutureUtils; +import com.twitter.distributedlog.util.OrderedScheduler; +import com.twitter.distributedlog.util.Utils; +import com.twitter.util.Future; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.LedgerHandle; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static com.google.common.base.Charsets.UTF_8; +import static org.junit.Assert.*; + +/** + * Test Case for {@link BKLogSegmentEntryReader} + */ +public class TestBKLogSegmentEntryReader extends TestDistributedLogBase { + + @Rule + public TestName runtime = new TestName(); + private OrderedScheduler scheduler; + private BookKeeperClient bkc; + + @Before + public void setup() throws Exception { + super.setup(); + bkc = BookKeeperClientBuilder.newBuilder() + .name("test-bk") + .dlConfig(conf) + .ledgersPath("/ledgers") + .zkServers(bkutil.getZkServers()) + .build(); + scheduler = OrderedScheduler.newBuilder() + .name("test-bk-logsegment-entry-reader") + .corePoolSize(1) + .build(); + } + + @After + public void teardown() throws Exception { + if (null != bkc) { + bkc.close(); + } + if (null != scheduler) { + scheduler.shutdown(); + } + super.teardown(); + } + + BKLogSegmentEntryReader createEntryReader(LogSegmentMetadata segment, + long startEntryId, + DistributedLogConfiguration conf) + throws Exception { + LedgerHandle lh; + if (segment.isInProgress()) { + lh = bkc.get().openLedgerNoRecovery( + segment.getLedgerId(), + BookKeeper.DigestType.CRC32, + conf.getBKDigestPW().getBytes(UTF_8)); + } else { + lh = bkc.get().openLedger( + segment.getLedgerId(), + BookKeeper.DigestType.CRC32, + conf.getBKDigestPW().getBytes(UTF_8)); + } + return new BKLogSegmentEntryReader( + segment, + lh, + startEntryId, + bkc.get(), + scheduler, + conf); + } + + void generateCompletedLogSegments(DistributedLogManager dlm, + DistributedLogConfiguration conf, + long numCompletedSegments, + long segmentSize) throws Exception { + long txid = 1L; + for (long i = 0; i < numCompletedSegments; i++) { + AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter()); + for (long j = 1; j <= segmentSize; j++) { + FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(txid++))); + LogRecord ctrlRecord = DLMTestUtil.getLogRecordInstance(txid); + ctrlRecord.setControl(); + FutureUtils.result(writer.write(ctrlRecord)); + } + Utils.close(writer); + } + } + + AsyncLogWriter createInprogressLogSegment(DistributedLogManager dlm, + DistributedLogConfiguration conf, + long segmentSize) throws Exception { + AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter()); + for (long i = 1L; i <= segmentSize; i++) { + FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(i))); + LogRecord ctrlRecord = DLMTestUtil.getLogRecordInstance(i); + ctrlRecord.setControl(); + FutureUtils.result(writer.write(ctrlRecord)); + } + return writer; + } + + @Test(timeout = 60000) + public void testReadEntriesFromCompleteLogSegment() throws Exception { + DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); + confLocal.addConfiguration(conf); + confLocal.setOutputBufferSize(0); + confLocal.setPeriodicFlushFrequencyMilliSeconds(0); + confLocal.setImmediateFlushEnabled(false); + confLocal.setNumPrefetchEntriesPerLogSegment(10); + confLocal.setMaxPrefetchEntriesPerLogSegment(10); + DistributedLogManager dlm = createNewDLM(confLocal, runtime.getMethodName()); + generateCompletedLogSegments(dlm, confLocal, 1, 20); + List<LogSegmentMetadata> segments = dlm.getLogSegments(); + assertEquals(segments.size() + " log segments found, expected to be only one", + 1, segments.size()); + + BKLogSegmentEntryReader reader = createEntryReader(segments.get(0), 0, confLocal); + reader.start(); + boolean done = false; + long txId = 1L; + long entryId = 0L; + while (!done) { + Entry.Reader entryReader; + try { + entryReader = FutureUtils.result(reader.readNext(1)).get(0); + } catch (EndOfLogSegmentException eol) { + done = true; + continue; + } + LogRecordWithDLSN record = entryReader.nextRecord(); + while (null != record) { + if (!record.isControl()) { + DLMTestUtil.verifyLogRecord(record); + assertEquals(txId, record.getTransactionId()); + ++txId; + } + DLSN dlsn = record.getDlsn(); + assertEquals(1L, dlsn.getLogSegmentSequenceNo()); + assertEquals(entryId, dlsn.getEntryId()); + record = entryReader.nextRecord(); + } + ++entryId; + } + assertEquals(21, txId); + Utils.close(reader); + } + + @Test(timeout = 60000) + public void testCloseReaderToCancelPendingReads() throws Exception { + DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); + confLocal.addConfiguration(conf); + confLocal.setNumPrefetchEntriesPerLogSegment(10); + confLocal.setMaxPrefetchEntriesPerLogSegment(10); + DistributedLogManager dlm = createNewDLM(confLocal, runtime.getMethodName()); + DLMTestUtil.generateCompletedLogSegments(dlm, confLocal, 1, 20); + List<LogSegmentMetadata> segments = dlm.getLogSegments(); + assertEquals(segments.size() + " log segments found, expected to be only one", + 1, segments.size()); + + BKLogSegmentEntryReader reader = createEntryReader(segments.get(0), 0, confLocal); + List<Future<List<Entry.Reader>>> futures = Lists.newArrayList(); + for (int i = 0; i < 5; i++) { + futures.add(reader.readNext(1)); + } + assertFalse("Reader should not be closed yet", reader.isClosed()); + Utils.close(reader); + for (Future<List<Entry.Reader>> future : futures) { + try { + FutureUtils.result(future); + fail("The read request should be cancelled"); + } catch (ReadCancelledException rce) { + // expected + } + } + assertTrue("Reader should be closed yet", reader.isClosed()); + } + + @Test(timeout = 60000) + public void testMaxPrefetchEntriesSmallBatch() throws Exception { + DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); + confLocal.addConfiguration(conf); + confLocal.setOutputBufferSize(0); + confLocal.setPeriodicFlushFrequencyMilliSeconds(0); + confLocal.setImmediateFlushEnabled(false); + confLocal.setNumPrefetchEntriesPerLogSegment(2); + confLocal.setMaxPrefetchEntriesPerLogSegment(10); + DistributedLogManager dlm = createNewDLM(confLocal, runtime.getMethodName()); + generateCompletedLogSegments(dlm, confLocal, 1, 20); + List<LogSegmentMetadata> segments = dlm.getLogSegments(); + assertEquals(segments.size() + " log segments found, expected to be only one", + 1, segments.size()); + + BKLogSegmentEntryReader reader = createEntryReader(segments.get(0), 0, confLocal); + reader.start(); + + // wait for the read ahead entries to become available + while (reader.readAheadEntries.size() < 10) { + TimeUnit.MILLISECONDS.sleep(10); + } + + long txId = 1L; + long entryId = 0L; + + + assertEquals(10, reader.readAheadEntries.size()); + assertEquals(10, reader.getNextEntryId()); + // read first entry + Entry.Reader entryReader = FutureUtils.result(reader.readNext(1)).get(0); + LogRecordWithDLSN record = entryReader.nextRecord(); + while (null != record) { + if (!record.isControl()) { + DLMTestUtil.verifyLogRecord(record); + assertEquals(txId, record.getTransactionId()); + ++txId; + } + DLSN dlsn = record.getDlsn(); + assertEquals(1L, dlsn.getLogSegmentSequenceNo()); + assertEquals(entryId, dlsn.getEntryId()); + record = entryReader.nextRecord(); + } + ++entryId; + assertEquals(2L, txId); + // wait for the read ahead entries to become 10 again + while (reader.readAheadEntries.size() < 10) { + TimeUnit.MILLISECONDS.sleep(10); + } + + assertEquals(10, reader.readAheadEntries.size()); + assertEquals(11, reader.getNextEntryId()); + + Utils.close(reader); + } + + @Test(timeout = 60000) + public void testMaxPrefetchEntriesLargeBatch() throws Exception { + DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); + confLocal.addConfiguration(conf); + confLocal.setOutputBufferSize(0); + confLocal.setPeriodicFlushFrequencyMilliSeconds(0); + confLocal.setImmediateFlushEnabled(false); + confLocal.setNumPrefetchEntriesPerLogSegment(10); + confLocal.setMaxPrefetchEntriesPerLogSegment(5); + DistributedLogManager dlm = createNewDLM(confLocal, runtime.getMethodName()); + generateCompletedLogSegments(dlm, confLocal, 1, 20); + List<LogSegmentMetadata> segments = dlm.getLogSegments(); + assertEquals(segments.size() + " log segments found, expected to be only one", + 1, segments.size()); + + BKLogSegmentEntryReader reader = createEntryReader(segments.get(0), 0, confLocal); + reader.start(); + + // wait for the read ahead entries to become available + while (reader.readAheadEntries.size() < 5) { + TimeUnit.MILLISECONDS.sleep(10); + } + + long txId = 1L; + long entryId = 0L; + + assertEquals(5, reader.readAheadEntries.size()); + assertEquals(5, reader.getNextEntryId()); + // read first entry + Entry.Reader entryReader = FutureUtils.result(reader.readNext(1)).get(0); + LogRecordWithDLSN record = entryReader.nextRecord(); + while (null != record) { + if (!record.isControl()) { + DLMTestUtil.verifyLogRecord(record); + assertEquals(txId, record.getTransactionId()); + ++txId; + } + DLSN dlsn = record.getDlsn(); + assertEquals(1L, dlsn.getLogSegmentSequenceNo()); + assertEquals(entryId, dlsn.getEntryId()); + record = entryReader.nextRecord(); + } + ++entryId; + assertEquals(2L, txId); + // wait for the read ahead entries to become 10 again + while (reader.readAheadEntries.size() < 5) { + TimeUnit.MILLISECONDS.sleep(10); + } + + assertEquals(5, reader.readAheadEntries.size()); + assertEquals(6, reader.getNextEntryId()); + + Utils.close(reader); + } + + @Test(timeout = 60000) + public void testMaxPrefetchEntriesSmallSegment() throws Exception { + DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); + confLocal.addConfiguration(conf); + confLocal.setOutputBufferSize(0); + confLocal.setPeriodicFlushFrequencyMilliSeconds(0); + confLocal.setImmediateFlushEnabled(false); + confLocal.setNumPrefetchEntriesPerLogSegment(10); + confLocal.setMaxPrefetchEntriesPerLogSegment(20); + DistributedLogManager dlm = createNewDLM(confLocal, runtime.getMethodName()); + generateCompletedLogSegments(dlm, confLocal, 1, 5); + List<LogSegmentMetadata> segments = dlm.getLogSegments(); + assertEquals(segments.size() + " log segments found, expected to be only one", + 1, segments.size()); + + BKLogSegmentEntryReader reader = createEntryReader(segments.get(0), 0, confLocal); + reader.start(); + + // wait for the read ahead entries to become available + while (reader.readAheadEntries.size() < (reader.getLastAddConfirmed() + 1)) { + TimeUnit.MILLISECONDS.sleep(10); + } + + long txId = 1L; + long entryId = 0L; + + assertEquals((reader.getLastAddConfirmed() + 1), reader.readAheadEntries.size()); + assertEquals((reader.getLastAddConfirmed() + 1), reader.getNextEntryId()); + // read first entry + Entry.Reader entryReader = FutureUtils.result(reader.readNext(1)).get(0); + LogRecordWithDLSN record = entryReader.nextRecord(); + while (null != record) { + if (!record.isControl()) { + DLMTestUtil.verifyLogRecord(record); + assertEquals(txId, record.getTransactionId()); + ++txId; + } + DLSN dlsn = record.getDlsn(); + assertEquals(1L, dlsn.getLogSegmentSequenceNo()); + assertEquals(entryId, dlsn.getEntryId()); + record = entryReader.nextRecord(); + } + ++entryId; + assertEquals(2L, txId); + assertEquals(reader.getLastAddConfirmed(), reader.readAheadEntries.size()); + assertEquals((reader.getLastAddConfirmed() + 1), reader.getNextEntryId()); + + Utils.close(reader); + } + + @Test(timeout = 60000) + public void testReadEntriesFromInprogressSegment() throws Exception { + DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); + confLocal.addConfiguration(conf); + confLocal.setOutputBufferSize(0); + confLocal.setPeriodicFlushFrequencyMilliSeconds(0); + confLocal.setImmediateFlushEnabled(false); + confLocal.setNumPrefetchEntriesPerLogSegment(20); + confLocal.setMaxPrefetchEntriesPerLogSegment(20); + DistributedLogManager dlm = createNewDLM(confLocal, runtime.getMethodName()); + AsyncLogWriter writer = createInprogressLogSegment(dlm, confLocal, 5); + List<LogSegmentMetadata> segments = dlm.getLogSegments(); + assertEquals(segments.size() + " log segments found, expected to be only one", + 1, segments.size()); + + BKLogSegmentEntryReader reader = createEntryReader(segments.get(0), 0, confLocal); + reader.start(); + + long expectedLastAddConfirmed = 8L; + // wait until sending out all prefetch requests + while (reader.readAheadEntries.size() < expectedLastAddConfirmed + 2) { + TimeUnit.MILLISECONDS.sleep(10); + } + assertEquals(expectedLastAddConfirmed + 2, reader.getNextEntryId()); + + long txId = 1L; + long entryId = 0L; + while (true) { + Entry.Reader entryReader = FutureUtils.result(reader.readNext(1)).get(0); + LogRecordWithDLSN record = entryReader.nextRecord(); + while (null != record) { + if (!record.isControl()) { + DLMTestUtil.verifyLogRecord(record); + assertEquals(txId, record.getTransactionId()); + ++txId; + } + DLSN dlsn = record.getDlsn(); + assertEquals(1L, dlsn.getLogSegmentSequenceNo()); + assertEquals(entryId, dlsn.getEntryId()); + record = entryReader.nextRecord(); + } + ++entryId; + if (entryId == expectedLastAddConfirmed + 1) { + break; + } + } + assertEquals(6L, txId); + + Future<List<Entry.Reader>> nextReadFuture = reader.readNext(1); + // write another record to commit previous writes + FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(txId))); + // the long poll will be satisfied + List<Entry.Reader> nextReadEntries = FutureUtils.result(nextReadFuture); + assertEquals(1, nextReadEntries.size()); + Entry.Reader entryReader = nextReadEntries.get(0); + LogRecordWithDLSN record = entryReader.nextRecord(); + assertNotNull(record); + assertTrue(record.isControl()); + assertNull(entryReader.nextRecord()); + // once the read is advanced, we will prefetch next record + while (reader.getNextEntryId() <= entryId) { + TimeUnit.MILLISECONDS.sleep(10); + } + assertEquals(entryId + 2, reader.getNextEntryId()); + assertEquals(1, reader.readAheadEntries.size()); + + Utils.close(reader); + Utils.close(writer); + } + + @Test(timeout = 60000) + public void testReadEntriesOnStateChange() throws Exception { + DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); + confLocal.addConfiguration(conf); + confLocal.setOutputBufferSize(0); + confLocal.setPeriodicFlushFrequencyMilliSeconds(0); + confLocal.setImmediateFlushEnabled(false); + confLocal.setNumPrefetchEntriesPerLogSegment(20); + confLocal.setMaxPrefetchEntriesPerLogSegment(20); + DistributedLogManager dlm = createNewDLM(confLocal, runtime.getMethodName()); + AsyncLogWriter writer = createInprogressLogSegment(dlm, confLocal, 5); + List<LogSegmentMetadata> segments = dlm.getLogSegments(); + assertEquals(segments.size() + " log segments found, expected to be only one", + 1, segments.size()); + + BKLogSegmentEntryReader reader = createEntryReader(segments.get(0), 0, confLocal); + reader.start(); + + long expectedLastAddConfirmed = 8L; + // wait until sending out all prefetch requests + while (reader.readAheadEntries.size() < expectedLastAddConfirmed + 2) { + TimeUnit.MILLISECONDS.sleep(10); + } + assertEquals(expectedLastAddConfirmed + 2, reader.getNextEntryId()); + + long txId = 1L; + long entryId = 0L; + while (true) { + Entry.Reader entryReader = FutureUtils.result(reader.readNext(1)).get(0); + LogRecordWithDLSN record = entryReader.nextRecord(); + while (null != record) { + if (!record.isControl()) { + DLMTestUtil.verifyLogRecord(record); + assertEquals(txId, record.getTransactionId()); + ++txId; + } + DLSN dlsn = record.getDlsn(); + assertEquals(1L, dlsn.getLogSegmentSequenceNo()); + assertEquals(entryId, dlsn.getEntryId()); + record = entryReader.nextRecord(); + } + ++entryId; + if (entryId == expectedLastAddConfirmed + 1) { + break; + } + } + assertEquals(6L, txId); + + Future<List<Entry.Reader>> nextReadFuture = reader.readNext(1); + // write another record to commit previous writes + FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(txId))); + // the long poll will be satisfied + List<Entry.Reader> nextReadEntries = FutureUtils.result(nextReadFuture); + assertEquals(1, nextReadEntries.size()); + Entry.Reader entryReader = nextReadEntries.get(0); + LogRecordWithDLSN record = entryReader.nextRecord(); + assertNotNull(record); + assertTrue(record.isControl()); + assertNull(entryReader.nextRecord()); + // once the read is advanced, we will prefetch next record + while (reader.getNextEntryId() <= entryId) { + TimeUnit.MILLISECONDS.sleep(10); + } + assertEquals(entryId + 2, reader.getNextEntryId()); + assertEquals(1, reader.readAheadEntries.size()); + + // advance the entry id + ++entryId; + // close the writer, the write will be committed + Utils.close(writer); + entryReader = FutureUtils.result(reader.readNext(1)).get(0); + record = entryReader.nextRecord(); + assertNotNull(record); + assertFalse(record.isControl()); + assertNull(entryReader.nextRecord()); + while (reader.getNextEntryId() <= entryId + 1) { + TimeUnit.MILLISECONDS.sleep(10); + } + assertEquals(entryId + 2, reader.getNextEntryId()); + assertEquals(1, reader.readAheadEntries.size()); + + // get the new log segment + List<LogSegmentMetadata> newSegments = dlm.getLogSegments(); + assertEquals(1, newSegments.size()); + assertFalse(newSegments.get(0).isInProgress()); + reader.onLogSegmentMetadataUpdated(newSegments.get(0)); + // when reader received the new log segments. the outstanding long poll + // should be cancelled and end of log segment should be signaled correctly + try { + // when we closed the log segment, another control record will be + // written, so we loop over the reader until we reach end of log segment. + FutureUtils.result(reader.readNext(1)); + FutureUtils.result(reader.readNext(1)); + fail("Should reach end of log segment"); + } catch (EndOfLogSegmentException eol) { + // expected + } + Utils.close(reader); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/27c04f37/distributedlog-protocol/src/main/java/com/twitter/distributedlog/exceptions/EndOfLogSegmentException.java ---------------------------------------------------------------------- diff --git a/distributedlog-protocol/src/main/java/com/twitter/distributedlog/exceptions/EndOfLogSegmentException.java b/distributedlog-protocol/src/main/java/com/twitter/distributedlog/exceptions/EndOfLogSegmentException.java new file mode 100644 index 0000000..ae4aa45 --- /dev/null +++ b/distributedlog-protocol/src/main/java/com/twitter/distributedlog/exceptions/EndOfLogSegmentException.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.exceptions; + +import com.twitter.distributedlog.thrift.service.StatusCode; + +/** + * Exception thrown when reach end of the log segment. + */ +public class EndOfLogSegmentException extends DLException { + + private static final long serialVersionUID = 6060419315910178451L; + + public EndOfLogSegmentException(String logSegmentName) { + super(StatusCode.END_OF_LOG_SEGMENT, "end of log segment " + logSegmentName); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/27c04f37/distributedlog-protocol/src/main/thrift/service.thrift ---------------------------------------------------------------------- diff --git a/distributedlog-protocol/src/main/thrift/service.thrift b/distributedlog-protocol/src/main/thrift/service.thrift index a25af63..a67664c 100644 --- a/distributedlog-protocol/src/main/thrift/service.thrift +++ b/distributedlog-protocol/src/main/thrift/service.thrift @@ -96,6 +96,8 @@ enum StatusCode { TOO_MANY_STREAMS = 524, /* Log Segment Not Found */ LOG_SEGMENT_NOT_FOUND = 525, + // End of Log Segment + END_OF_LOG_SEGMENT = 526, /* 6xx: unexpected */