http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAheadEntryReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAheadEntryReader.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAheadEntryReader.java new file mode 100644 index 0000000..573ae5c --- /dev/null +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAheadEntryReader.java @@ -0,0 +1,423 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog; + +import com.google.common.base.Optional; +import com.google.common.base.Ticker; +import com.google.common.collect.Lists; +import com.twitter.distributedlog.exceptions.AlreadyTruncatedTransactionException; +import com.twitter.distributedlog.exceptions.DLIllegalStateException; +import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryStore; +import com.twitter.distributedlog.injector.AsyncFailureInjector; +import com.twitter.distributedlog.logsegment.LogSegmentEntryStore; +import com.twitter.distributedlog.util.FutureUtils; +import com.twitter.distributedlog.util.OrderedScheduler; +import com.twitter.distributedlog.util.Utils; +import com.twitter.util.Promise; +import org.apache.bookkeeper.stats.AlertStatsLogger; +import org.apache.bookkeeper.stats.NullStatsLogger; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import java.util.List; + +import static org.junit.Assert.*; + +/** + * Test Case {@link ReadAheadEntryReader} + */ +public class TestReadAheadEntryReader extends TestDistributedLogBase { + + private static final int MAX_CACHED_ENTRIES = 5; + private static final int NUM_PREFETCH_ENTRIES = 10; + + @Rule + public TestName runtime = new TestName(); + private DistributedLogConfiguration baseConf; + private OrderedScheduler scheduler; + private BookKeeperClient bkc; + + @Before + public void setup() throws Exception { + super.setup(); + baseConf = new DistributedLogConfiguration(); + baseConf.addConfiguration(conf); + baseConf.setOutputBufferSize(0); + baseConf.setPeriodicFlushFrequencyMilliSeconds(0); + baseConf.setImmediateFlushEnabled(false); + baseConf.setReadAheadMaxRecords(MAX_CACHED_ENTRIES); + baseConf.setNumPrefetchEntriesPerLogSegment(NUM_PREFETCH_ENTRIES); + baseConf.setMaxPrefetchEntriesPerLogSegment(NUM_PREFETCH_ENTRIES); + bkc = BookKeeperClientBuilder.newBuilder() + .name("test-bk") + .dlConfig(conf) + .ledgersPath("/ledgers") + .zkServers(bkutil.getZkServers()) + .build(); + scheduler = OrderedScheduler.newBuilder() + .name("test-read-ahead-entry-reader") + .corePoolSize(1) + .build(); + } + + @After + public void teardown() throws Exception { + if (null != bkc) { + bkc.close(); + } + if (null != scheduler) { + scheduler.shutdown(); + } + super.teardown(); + } + + private ReadAheadEntryReader createEntryReader(String streamName, + DLSN fromDLSN, + BKDistributedLogManager dlm, + DistributedLogConfiguration conf) + throws Exception { + BKLogReadHandler readHandler = dlm.createReadHandler( + Optional.<String>absent(), + true); + LogSegmentEntryStore entryStore = new BKLogSegmentEntryStore( + conf, + bkc.get(), + scheduler, + NullStatsLogger.INSTANCE, + AsyncFailureInjector.NULL); + return new ReadAheadEntryReader( + streamName, + fromDLSN, + conf, + readHandler, + entryStore, + scheduler, + Ticker.systemTicker(), + new AlertStatsLogger(NullStatsLogger.INSTANCE, "test-alert")); + } + + private void ensureOrderSchedulerEmpty(String streamName) throws Exception { + final Promise<Void> promise = new Promise<Void>(); + scheduler.submit(streamName, new Runnable() { + @Override + public void run() { + FutureUtils.setValue(promise, null); + } + }); + FutureUtils.result(promise); + } + + void generateCompletedLogSegments(DistributedLogManager dlm, + long numCompletedSegments, + long segmentSize) throws Exception { + generateCompletedLogSegments(dlm, numCompletedSegments, segmentSize, 1L); + } + + void generateCompletedLogSegments(DistributedLogManager dlm, + long numCompletedSegments, + long segmentSize, + long startTxId) throws Exception { + + long txid = startTxId; + for (long i = 0; i < numCompletedSegments; i++) { + AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter()); + for (long j = 1; j <= segmentSize; j++) { + FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(txid++))); + LogRecord ctrlRecord = DLMTestUtil.getLogRecordInstance(txid); + ctrlRecord.setControl(); + FutureUtils.result(writer.write(ctrlRecord)); + } + Utils.close(writer); + } + } + + AsyncLogWriter createInprogressLogSegment(DistributedLogManager dlm, + DistributedLogConfiguration conf, + long segmentSize) throws Exception { + AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter()); + for (long i = 1L; i <= segmentSize; i++) { + FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(i))); + LogRecord ctrlRecord = DLMTestUtil.getLogRecordInstance(i); + ctrlRecord.setControl(); + FutureUtils.result(writer.write(ctrlRecord)); + } + return writer; + } + + void expectAlreadyTruncatedTransactionException(ReadAheadEntryReader reader, + String errMsg) + throws Exception { + try { + reader.checkLastException(); + fail(errMsg); + } catch (AlreadyTruncatedTransactionException atte) { + // expected + } + } + + void expectIllegalStateException(ReadAheadEntryReader reader, + String errMsg) + throws Exception { + try { + reader.checkLastException(); + fail(errMsg); + } catch (DLIllegalStateException le) { + // expected + } + } + + void expectNoException(ReadAheadEntryReader reader) throws Exception { + reader.checkLastException(); + } + + // + // Test Positioning + // + + @Test(timeout = 60000) + public void testStartWithEmptySegmentList() throws Exception { + String streamName = runtime.getMethodName(); + BKDistributedLogManager dlm = createNewDLM(baseConf, streamName); + ReadAheadEntryReader readAheadEntryReader = + createEntryReader(streamName, DLSN.InitialDLSN, dlm, baseConf); + + readAheadEntryReader.start(Lists.<LogSegmentMetadata>newArrayList()); + + ensureOrderSchedulerEmpty(streamName); + assertFalse("ReadAhead should not be initialized with empty segment list", + readAheadEntryReader.isInitialized()); + assertTrue("ReadAhead should be empty when it isn't initialized", + readAheadEntryReader.isCacheEmpty()); + assertFalse("ReadAhead should not be marked as caught up when it isn't initialized", + readAheadEntryReader.isReadAheadCaughtUp()); + + // generate list of log segments + generateCompletedLogSegments(dlm, 1, MAX_CACHED_ENTRIES / 2 + 1); + List<LogSegmentMetadata> segments = dlm.getLogSegments(); + assertEquals(segments.size() + " log segments found, expected to be only one", + 1, segments.size()); + + // notify the readahead reader with new segment lsit + readAheadEntryReader.onSegmentsUpdated(segments); + + // check the reader state after initialization + ensureOrderSchedulerEmpty(streamName); + assertTrue("ReadAhead should be initialized with non-empty segment list", + readAheadEntryReader.isInitialized()); + assertNotNull("current segment reader should be initialized", + readAheadEntryReader.getCurrentSegmentReader()); + assertEquals("current segment sequence number should be " + segments.get(0).getLogSegmentSequenceNumber(), + segments.get(0).getLogSegmentSequenceNumber(), readAheadEntryReader.getCurrentSegmentSequenceNumber()); + assertNull("there should be no next segment reader", + readAheadEntryReader.getNextSegmentReader()); + assertTrue("there should be no remaining segment readers", + readAheadEntryReader.getSegmentReaders().isEmpty()); + + Utils.close(readAheadEntryReader); + dlm.close(); + } + + @Test(timeout = 60000) + public void testInitializeMultipleClosedLogSegments0() throws Exception { + // 5 completed log segments, start from the begin + testInitializeMultipleClosedLogSegments(5, DLSN.InitialDLSN, 0); + } + + @Test(timeout = 60000) + public void testInitializeMultipleClosedLogSegments1() throws Exception { + // 5 completed log segments, start from the 4th segment and it should skip first 3 log segments + testInitializeMultipleClosedLogSegments(5, new DLSN(4L, 0L, 0L), 3); + } + + private void testInitializeMultipleClosedLogSegments( + int numLogSegments, + DLSN fromDLSN, + int expectedCurrentSegmentIdx + ) throws Exception { + String streamName = runtime.getMethodName(); + BKDistributedLogManager dlm = createNewDLM(baseConf, streamName); + + // generate list of log segments + generateCompletedLogSegments(dlm, 1, MAX_CACHED_ENTRIES / 2 + 1, 1L); + generateCompletedLogSegments(dlm, numLogSegments - 1, 1, MAX_CACHED_ENTRIES + 2); + List<LogSegmentMetadata> segments = dlm.getLogSegments(); + assertEquals(segments.size() + " log segments found, expected to be " + numLogSegments, + numLogSegments, segments.size()); + + ReadAheadEntryReader readAheadEntryReader = + createEntryReader(streamName, fromDLSN, dlm, baseConf); + readAheadEntryReader.start(segments); + + ensureOrderSchedulerEmpty(streamName); + assertTrue("ReadAhead should be initialized with non-empty segment list", + readAheadEntryReader.isInitialized()); + assertNotNull("current segment reader should be initialized", + readAheadEntryReader.getCurrentSegmentReader()); + assertTrue("current segment reader should be open and started", + readAheadEntryReader.getCurrentSegmentReader().isReaderOpen() + && readAheadEntryReader.getCurrentSegmentReader().isReaderStarted()); + assertEquals("current segment reader should read " + segments.get(expectedCurrentSegmentIdx), + segments.get(expectedCurrentSegmentIdx), + readAheadEntryReader.getCurrentSegmentReader().getSegment()); + assertEquals("current segment sequence number should be " + + segments.get(expectedCurrentSegmentIdx).getLogSegmentSequenceNumber(), + segments.get(expectedCurrentSegmentIdx).getLogSegmentSequenceNumber(), + readAheadEntryReader.getCurrentSegmentSequenceNumber()); + assertNull("next segment reader should not be initialized since it is a closed log segment", + readAheadEntryReader.getNextSegmentReader()); + assertEquals("there should be " + (numLogSegments - (expectedCurrentSegmentIdx + 1)) + + " remaining segment readers", + numLogSegments - (expectedCurrentSegmentIdx + 1), + readAheadEntryReader.getSegmentReaders().size()); + int segmentIdx = expectedCurrentSegmentIdx + 1; + for (ReadAheadEntryReader.SegmentReader reader : readAheadEntryReader.getSegmentReaders()) { + LogSegmentMetadata expectedSegment = segments.get(segmentIdx); + assertEquals("Segment should " + expectedSegment, + expectedSegment, reader.getSegment()); + assertTrue("Segment reader for " + expectedSegment + " should be open", + reader.isReaderOpen()); + assertFalse("Segment reader for " + expectedSegment + " should not be started", + reader.isReaderStarted()); + ++segmentIdx; + } + + Utils.close(readAheadEntryReader); + dlm.close(); + } + + @Test(timeout = 60000) + public void testPositioningAtInvalidLogSegment() throws Exception { + String streamName = runtime.getMethodName(); + BKDistributedLogManager dlm = createNewDLM(baseConf, streamName); + + // generate list of log segments + generateCompletedLogSegments(dlm, 3, 2); + AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter()); + FutureUtils.result(writer.truncate(new DLSN(2L, 1L, 0L))); + + List<LogSegmentMetadata> segments = dlm.getLogSegments(); + + // positioning on a truncated log segment (segment 1) + ReadAheadEntryReader readAheadEntryReader = + createEntryReader(streamName, DLSN.InitialDLSN, dlm, baseConf); + readAheadEntryReader.start(segments); + // ensure initialization to complete + ensureOrderSchedulerEmpty(streamName); + expectAlreadyTruncatedTransactionException(readAheadEntryReader, + "should fail on positioning to a truncated log segment"); + + // positioning on a partially truncated log segment (segment 2) before min active dlsn + readAheadEntryReader = createEntryReader(streamName, new DLSN(2L, 0L, 0L), dlm, baseConf); + readAheadEntryReader.start(segments); + // ensure initialization to complete + ensureOrderSchedulerEmpty(streamName); + expectAlreadyTruncatedTransactionException(readAheadEntryReader, + "should fail on positioning to a partially truncated log segment"); + + // positioning on a partially truncated log segment (segment 2) after min active dlsn + readAheadEntryReader = createEntryReader(streamName, new DLSN(2L, 1L, 0L), dlm, baseConf); + readAheadEntryReader.start(segments); + // ensure initialization to complete + ensureOrderSchedulerEmpty(streamName); + expectNoException(readAheadEntryReader); + } + + @Test(timeout = 60000) + public void testPositioningIgnoreTruncationStatus() throws Exception { + DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); + confLocal.addConfiguration(baseConf); + confLocal.setIgnoreTruncationStatus(true); + + String streamName = runtime.getMethodName(); + BKDistributedLogManager dlm = createNewDLM(confLocal, streamName); + + // generate list of log segments + generateCompletedLogSegments(dlm, 3, 2); + AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter()); + FutureUtils.result(writer.truncate(new DLSN(2L, 1L, 0L))); + + List<LogSegmentMetadata> segments = dlm.getLogSegments(); + + // positioning on a truncated log segment (segment 1) + ReadAheadEntryReader readAheadEntryReader = + createEntryReader(streamName, DLSN.InitialDLSN, dlm, confLocal); + readAheadEntryReader.start(segments); + // ensure initialization to complete + ensureOrderSchedulerEmpty(streamName); + expectNoException(readAheadEntryReader); + + // positioning on a partially truncated log segment (segment 2) before min active dlsn + readAheadEntryReader = createEntryReader(streamName, new DLSN(2L, 0L, 0L), dlm, confLocal); + readAheadEntryReader.start(segments); + // ensure initialization to complete + ensureOrderSchedulerEmpty(streamName); + expectNoException(readAheadEntryReader); + + // positioning on a partially truncated log segment (segment 2) after min active dlsn + readAheadEntryReader = createEntryReader(streamName, new DLSN(2L, 1L, 0L), dlm, confLocal); + readAheadEntryReader.start(segments); + // ensure initialization to complete + ensureOrderSchedulerEmpty(streamName); + expectNoException(readAheadEntryReader); + } + + // + // Test Reinitialization + // + + @Test(timeout = 60000) + public void testLogSegmentSequenceNumberGap() throws Exception { + String streamName = runtime.getMethodName(); + BKDistributedLogManager dlm = createNewDLM(baseConf, streamName); + + // generate list of log segments + generateCompletedLogSegments(dlm, 3, 2); + List<LogSegmentMetadata> segments = dlm.getLogSegments(); + + ReadAheadEntryReader readAheadEntryReader = + createEntryReader(streamName, DLSN.InitialDLSN, dlm, baseConf); + readAheadEntryReader.start(segments.subList(0, 1)); + int expectedCurrentSegmentIdx = 0; + ensureOrderSchedulerEmpty(streamName); + assertTrue("ReadAhead should be initialized with non-empty segment list", + readAheadEntryReader.isInitialized()); + assertNotNull("current segment reader should be initialized", + readAheadEntryReader.getCurrentSegmentReader()); + assertTrue("current segment reader should be open and started", + readAheadEntryReader.getCurrentSegmentReader().isReaderOpen() + && readAheadEntryReader.getCurrentSegmentReader().isReaderStarted()); + assertEquals("current segment reader should read " + segments.get(expectedCurrentSegmentIdx), + segments.get(expectedCurrentSegmentIdx), + readAheadEntryReader.getCurrentSegmentReader().getSegment()); + assertEquals("current segment sequence number should be " + + segments.get(expectedCurrentSegmentIdx).getLogSegmentSequenceNumber(), + segments.get(expectedCurrentSegmentIdx).getLogSegmentSequenceNumber(), + readAheadEntryReader.getCurrentSegmentSequenceNumber()); + assertNull("next segment reader should not be initialized since it is a closed log segment", + readAheadEntryReader.getNextSegmentReader()); + + readAheadEntryReader.onSegmentsUpdated(segments.subList(2, 3)); + ensureOrderSchedulerEmpty(streamName); + expectIllegalStateException(readAheadEntryReader, + "inconsistent log segment found"); + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReader.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReader.java index 3c8669b..1739b47 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReader.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReader.java @@ -17,6 +17,7 @@ */ package com.twitter.distributedlog; +import com.twitter.distributedlog.util.Utils; import com.twitter.util.Future; import com.twitter.util.FutureEventListener; import org.slf4j.Logger; @@ -121,6 +122,9 @@ public class TestReader implements FutureEventListener<LogRecordWithDLSN> { } nextDLSN = dlsn; LOG.info("Positioned reader {} at {}", readerName, dlsn); + if (null != TestReader.this.reader) { + Utils.close(TestReader.this.reader); + } TestReader.this.reader = reader; readNext(); readyLatch.countDown(); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java index 027b012..ee53362 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import com.twitter.distributedlog.feature.CoreFeatureKeys; +import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryReader; import com.twitter.distributedlog.util.FailpointUtils; import com.twitter.distributedlog.util.FutureUtils; import com.twitter.distributedlog.util.Utils; @@ -318,11 +319,11 @@ public class TestRollLogSegments extends TestDistributedLogBase { } assertEquals(expectedWriterPosition, getLedgerHandle(writer).getLastAddConfirmed()); assertEquals(expectedLac, inspector.readLastConfirmed()); - LedgerReadPosition readPosition = reader.bkLedgerManager.readAheadWorker.getNextReadAheadPosition(); + EntryPosition readPosition = reader.getReadAheadReader().getNextEntryPosition(); logger.info("ReadAhead moved read position {} : ", readPosition); while (readPosition.getEntryId() < expectedReaderPosition) { Thread.sleep(1000); - readPosition = reader.bkLedgerManager.readAheadWorker.getNextReadAheadPosition(); + readPosition = reader.getReadAheadReader().getNextEntryPosition(); logger.info("ReadAhead moved read position {} : ", readPosition); } assertEquals(expectedReaderPosition, readPosition.getEntryId()); @@ -386,7 +387,12 @@ public class TestRollLogSegments extends TestDistributedLogBase { // Writer moved to lac = 11, while reader knows lac = 10 and moving to wait on 11 checkAndWaitWriterReaderPosition(perStreamWriter, 11, reader, 11, readLh, 10); - while (null == reader.bkLedgerManager.readAheadWorker.getMetadataNotification()) { + while (true) { + BKLogSegmentEntryReader entryReader = + (BKLogSegmentEntryReader) reader.getReadAheadReader().getCurrentSegmentReader().getEntryReader(); + if (null != entryReader && null != entryReader.getOutstandingLongPoll()) { + break; + } Thread.sleep(1000); } logger.info("Waiting for long poll getting interrupted with metadata changed"); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/test/java/com/twitter/distributedlog/TestTruncate.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestTruncate.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestTruncate.java index 98d2020..011fc70 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestTruncate.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestTruncate.java @@ -329,7 +329,6 @@ public class TestTruncate extends TestDistributedLogBase { LogRecord r = reader.readNext(false); while (null != r) { DLMTestUtil.verifyLogRecord(r); - LOG.trace("Read entry {}.", r.getTransactionId()); assertEquals(txid++, r.getTransactionId()); ++numRead; r = reader.readNext(false); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java index 9a194c4..88840a0 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java @@ -32,24 +32,22 @@ import com.twitter.distributedlog.LogSegmentMetadata; import com.twitter.distributedlog.TestDistributedLogBase; import com.twitter.distributedlog.exceptions.EndOfLogSegmentException; import com.twitter.distributedlog.exceptions.ReadCancelledException; +import com.twitter.distributedlog.injector.AsyncFailureInjector; +import com.twitter.distributedlog.logsegment.LogSegmentEntryStore; import com.twitter.distributedlog.util.FutureUtils; import com.twitter.distributedlog.util.OrderedScheduler; import com.twitter.distributedlog.util.Utils; import com.twitter.util.Future; -import org.apache.bookkeeper.client.BookKeeper; -import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.stats.NullStatsLogger; import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.List; import java.util.concurrent.TimeUnit; -import static com.google.common.base.Charsets.UTF_8; import static org.junit.Assert.*; /** @@ -92,25 +90,9 @@ public class TestBKLogSegmentEntryReader extends TestDistributedLogBase { long startEntryId, DistributedLogConfiguration conf) throws Exception { - LedgerHandle lh; - if (segment.isInProgress()) { - lh = bkc.get().openLedgerNoRecovery( - segment.getLedgerId(), - BookKeeper.DigestType.CRC32, - conf.getBKDigestPW().getBytes(UTF_8)); - } else { - lh = bkc.get().openLedger( - segment.getLedgerId(), - BookKeeper.DigestType.CRC32, - conf.getBKDigestPW().getBytes(UTF_8)); - } - return new BKLogSegmentEntryReader( - segment, - lh, - startEntryId, - bkc.get(), - scheduler, - conf); + LogSegmentEntryStore store = new BKLogSegmentEntryStore( + conf, bkc.get(), scheduler, NullStatsLogger.INSTANCE, AsyncFailureInjector.NULL); + return (BKLogSegmentEntryReader) FutureUtils.result(store.openReader(segment, startEntryId)); } void generateCompletedLogSegments(DistributedLogManager dlm, @@ -186,6 +168,7 @@ public class TestBKLogSegmentEntryReader extends TestDistributedLogBase { ++entryId; } assertEquals(21, txId); + assertFalse(reader.hasCaughtUpOnInprogress()); Utils.close(reader); } @@ -216,6 +199,7 @@ public class TestBKLogSegmentEntryReader extends TestDistributedLogBase { // expected } } + assertFalse(reader.hasCaughtUpOnInprogress()); assertTrue("Reader should be closed yet", reader.isClosed()); } @@ -245,9 +229,9 @@ public class TestBKLogSegmentEntryReader extends TestDistributedLogBase { long txId = 1L; long entryId = 0L; - assertEquals(10, reader.readAheadEntries.size()); assertEquals(10, reader.getNextEntryId()); + assertFalse(reader.hasCaughtUpOnInprogress()); // read first entry Entry.Reader entryReader = FutureUtils.result(reader.readNext(1)).get(0); LogRecordWithDLSN record = entryReader.nextRecord(); @@ -271,6 +255,7 @@ public class TestBKLogSegmentEntryReader extends TestDistributedLogBase { assertEquals(10, reader.readAheadEntries.size()); assertEquals(11, reader.getNextEntryId()); + assertFalse(reader.hasCaughtUpOnInprogress()); Utils.close(reader); } @@ -326,6 +311,7 @@ public class TestBKLogSegmentEntryReader extends TestDistributedLogBase { assertEquals(5, reader.readAheadEntries.size()); assertEquals(6, reader.getNextEntryId()); + assertFalse(reader.hasCaughtUpOnInprogress()); Utils.close(reader); } @@ -376,6 +362,7 @@ public class TestBKLogSegmentEntryReader extends TestDistributedLogBase { assertEquals(2L, txId); assertEquals(reader.getLastAddConfirmed(), reader.readAheadEntries.size()); assertEquals((reader.getLastAddConfirmed() + 1), reader.getNextEntryId()); + assertFalse(reader.hasCaughtUpOnInprogress()); Utils.close(reader); } @@ -434,6 +421,7 @@ public class TestBKLogSegmentEntryReader extends TestDistributedLogBase { // the long poll will be satisfied List<Entry.Reader> nextReadEntries = FutureUtils.result(nextReadFuture); assertEquals(1, nextReadEntries.size()); + assertTrue(reader.hasCaughtUpOnInprogress()); Entry.Reader entryReader = nextReadEntries.get(0); LogRecordWithDLSN record = entryReader.nextRecord(); assertNotNull(record); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-protocol/src/main/java/com/twitter/distributedlog/exceptions/LogSegmentIsTruncatedException.java ---------------------------------------------------------------------- diff --git a/distributedlog-protocol/src/main/java/com/twitter/distributedlog/exceptions/LogSegmentIsTruncatedException.java b/distributedlog-protocol/src/main/java/com/twitter/distributedlog/exceptions/LogSegmentIsTruncatedException.java new file mode 100644 index 0000000..2b56550 --- /dev/null +++ b/distributedlog-protocol/src/main/java/com/twitter/distributedlog/exceptions/LogSegmentIsTruncatedException.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.exceptions; + +import com.twitter.distributedlog.thrift.service.StatusCode; + +/** + * Exception is thrown when reading data from a truncated log segment. + */ +public class LogSegmentIsTruncatedException extends DLException { + + private static final long serialVersionUID = -218506870918498062L; + + public LogSegmentIsTruncatedException(String logSegmentName) { + super(StatusCode.LOG_SEGMENT_IS_TRUNCATED, "Log Segment '" + + logSegmentName + "'" + " is already truncated"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-protocol/src/main/thrift/service.thrift ---------------------------------------------------------------------- diff --git a/distributedlog-protocol/src/main/thrift/service.thrift b/distributedlog-protocol/src/main/thrift/service.thrift index a67664c..c2f77f3 100644 --- a/distributedlog-protocol/src/main/thrift/service.thrift +++ b/distributedlog-protocol/src/main/thrift/service.thrift @@ -98,6 +98,8 @@ enum StatusCode { LOG_SEGMENT_NOT_FOUND = 525, // End of Log Segment END_OF_LOG_SEGMENT = 526, + // Log Segment Is Truncated + LOG_SEGMENT_IS_TRUNCATED = 527, /* 6xx: unexpected */