http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java index b0a38cf..c403e26 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java @@ -17,12 +17,12 @@ */ package com.twitter.distributedlog; +import com.twitter.distributedlog.impl.BKNamespaceDriver; import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryWriter; -import com.twitter.distributedlog.logsegment.LogSegmentFilter; -import com.twitter.distributedlog.metadata.BKDLConfig; -import com.twitter.distributedlog.metadata.DLMetadata; +import com.twitter.distributedlog.logsegment.LogSegmentEntryStore; import com.twitter.distributedlog.namespace.DistributedLogNamespace; import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder; +import com.twitter.distributedlog.namespace.NamespaceDriver; import com.twitter.distributedlog.util.ConfUtils; import com.twitter.distributedlog.util.FutureUtils; import com.twitter.distributedlog.util.PermitLimiter; @@ -35,11 +35,7 @@ import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.feature.SettableFeatureProvider; -import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.versioning.Version; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooDefs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,11 +85,6 @@ public class DLMTestUtil { return segments; } - static void updateBKDLConfig(URI uri, String zkServers, String ledgersPath, boolean sanityCheckTxnID) throws Exception { - BKDLConfig bkdlConfig = new BKDLConfig(zkServers, ledgersPath).setSanityCheckTxnID(sanityCheckTxnID); - DLMetadata.create(bkdlConfig).update(uri); - } - public static URI createDLMURI(int port, String path) throws Exception { return LocalDLMEmulator.createDLMURI("127.0.0.1:" + port, path); } @@ -111,93 +102,18 @@ public class DLMTestUtil { URI uri) throws Exception { // TODO: Metadata Accessor seems to be a legacy object which only used by kestrel // (we might consider deprecating this) - BKDistributedLogNamespace namespace = BKDistributedLogNamespace.newBuilder() + DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() .conf(conf).uri(uri).build(); - return namespace.createMetadataAccessor(name); - } - - public static class BKLogPartitionWriteHandlerAndClients { - private BKLogWriteHandler writeHandler; - private ZooKeeperClient zooKeeperClient; - private BookKeeperClient bookKeeperClient; - - public BKLogPartitionWriteHandlerAndClients(BKLogWriteHandler writeHandler, ZooKeeperClient zooKeeperClient, BookKeeperClient bookKeeperClient) { - this.writeHandler = writeHandler; - this.zooKeeperClient = zooKeeperClient; - this.bookKeeperClient = bookKeeperClient; - } - - public void close() { - bookKeeperClient.close(); - zooKeeperClient.close(); - Utils.closeQuietly(writeHandler); - } - - public BKLogWriteHandler getWriteHandler() { - return writeHandler; - } - } - - static BKLogPartitionWriteHandlerAndClients createNewBKDLM(DistributedLogConfiguration conf, - String logName, - int zkPort) throws Exception { - URI uri = createDLMURI(zkPort, "/" + logName); - - ZooKeeperClientBuilder zkcBuilder = TestZooKeeperClientBuilder.newBuilder(conf) - .name(String.format("dlzk:%s:handler_dedicated", logName)) - .uri(uri); - - ZooKeeperClient zkClient = zkcBuilder.build(); - - try { - zkClient.get().create(uri.getPath(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } catch (KeeperException.NodeExistsException nee) { - // ignore - } - - // resolve uri - BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(zkClient, uri); - BKDLConfig.propagateConfiguration(bkdlConfig, conf); - BookKeeperClientBuilder bkcBuilder = BookKeeperClientBuilder.newBuilder() - .dlConfig(conf) - .name(String.format("bk:%s:handler_dedicated", logName)) - .zkServers(bkdlConfig.getBkZkServersForWriter()) - .ledgersPath(bkdlConfig.getBkLedgersPath()) - .statsLogger(NullStatsLogger.INSTANCE); - - BKDistributedLogManager bkdlm = new BKDistributedLogManager( - logName, - conf, - uri, - zkcBuilder, - zkcBuilder, - zkClient, - zkClient, - bkcBuilder, - bkcBuilder, - new SettableFeatureProvider("", 0), - PermitLimiter.NULL_PERMIT_LIMITER, - NullStatsLogger.INSTANCE); - - BKLogWriteHandler writeHandler = bkdlm.createWriteHandler(true); - return new BKLogPartitionWriteHandlerAndClients(writeHandler, zkClient, bkcBuilder.build()); + return namespace.getNamespaceDriver().getMetadataAccessor(name); } public static void fenceStream(DistributedLogConfiguration conf, URI uri, String name) throws Exception { - BKDistributedLogManager dlm = (BKDistributedLogManager) createNewDLM(name, conf, uri); + DistributedLogManager dlm = createNewDLM(name, conf, uri); try { - BKLogReadHandler readHandler = dlm.createReadHandler(); - List<LogSegmentMetadata> ledgerList = FutureUtils.result( - readHandler.readLogSegmentsFromStore( - LogSegmentMetadata.COMPARATOR, - LogSegmentFilter.DEFAULT_FILTER, - null) - ).getValue(); - LogSegmentMetadata lastSegment = ledgerList.get(ledgerList.size() - 1); - BookKeeperClient bkc = dlm.getWriterBKC(); - LedgerHandle lh = bkc.get().openLedger(lastSegment.getLogSegmentId(), - BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(UTF_8)); - lh.close(); + List<LogSegmentMetadata> logSegmentList = dlm.getLogSegments(); + LogSegmentMetadata lastSegment = logSegmentList.get(logSegmentList.size() - 1); + LogSegmentEntryStore entryStore = dlm.getNamespaceDriver().getLogSegmentEntryStore(NamespaceDriver.Role.READER); + Utils.close(FutureUtils.result(entryStore.openRandomAccessReader(lastSegment, true))); } finally { dlm.close(); } @@ -409,6 +325,14 @@ public class DLMTestUtil { return txid - startTxid; } + public static ZooKeeperClient getZooKeeperClient(BKDistributedLogManager dlm) { + return ((BKNamespaceDriver) dlm.getNamespaceDriver()).getWriterZKC(); + } + + public static BookKeeperClient getBookKeeperClient(BKDistributedLogManager dlm) { + return ((BKNamespaceDriver) dlm.getNamespaceDriver()).getReaderBKC(); + } + public static void injectLogSegmentWithGivenLogSegmentSeqNo(DistributedLogManager manager, DistributedLogConfiguration conf, long logSegmentSeqNo, long startTxID, boolean writeEntries, long segmentSize, boolean completeLogSegment) @@ -417,7 +341,7 @@ public class DLMTestUtil { BKLogWriteHandler writeHandler = dlm.createWriteHandler(false); FutureUtils.result(writeHandler.lockHandler()); // Start a log segment with a given ledger seq number. - BookKeeperClient bkc = dlm.getWriterBKC(); + BookKeeperClient bkc = getBookKeeperClient(dlm); LedgerHandle lh = bkc.get().createLedger(conf.getEnsembleSize(), conf.getWriteQuorumSize(), conf.getAckQuorumSize(), BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes()); String inprogressZnodeName = writeHandler.inprogressZNodeName(lh.getId(), startTxID, logSegmentSeqNo); @@ -429,7 +353,7 @@ public class DLMTestUtil { .setLogSegmentSequenceNo(logSegmentSeqNo) .setEnvelopeEntries(LogSegmentMetadata.supportsEnvelopedEntries(logSegmentMetadataVersion)) .build(); - l.write(dlm.writerZKC); + l.write(getZooKeeperClient(dlm)); writeHandler.maxTxId.update(Version.ANY, startTxID); writeHandler.addLogSegmentToCache(inprogressZnodeName, l); BKLogSegmentWriter writer = new BKLogSegmentWriter( @@ -468,7 +392,7 @@ public class DLMTestUtil { BKLogWriteHandler writeHandler = dlm.createWriteHandler(false); FutureUtils.result(writeHandler.lockHandler()); // Start a log segment with a given ledger seq number. - BookKeeperClient bkc = dlm.getReaderBKC(); + BookKeeperClient bkc = getBookKeeperClient(dlm); LedgerHandle lh = bkc.get().createLedger(conf.getEnsembleSize(), conf.getWriteQuorumSize(), conf.getAckQuorumSize(), BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes()); String inprogressZnodeName = writeHandler.inprogressZNodeName(lh.getId(), startTxID, logSegmentSeqNo); @@ -479,7 +403,7 @@ public class DLMTestUtil { .setLogSegmentSequenceNo(logSegmentSeqNo) .setInprogress(false) .build(); - l.write(dlm.writerZKC); + l.write(getZooKeeperClient(dlm)); writeHandler.maxTxId.update(Version.ANY, startTxID); writeHandler.addLogSegmentToCache(inprogressZnodeName, l); BKLogSegmentWriter writer = new BKLogSegmentWriter(
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderLock.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderLock.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderLock.java index a6cffbb..124ea77 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderLock.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderLock.java @@ -29,9 +29,11 @@ import java.util.concurrent.atomic.AtomicReference; import com.twitter.distributedlog.exceptions.LockCancelledException; import com.twitter.distributedlog.exceptions.LockingException; import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException; +import com.twitter.distributedlog.impl.BKNamespaceDriver; import com.twitter.distributedlog.lock.LockClosedException; import com.twitter.distributedlog.namespace.DistributedLogNamespace; import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder; +import com.twitter.distributedlog.namespace.NamespaceDriver; import com.twitter.distributedlog.subscription.SubscriptionsStore; import com.twitter.distributedlog.util.FutureUtils; import com.twitter.distributedlog.util.Utils; @@ -85,7 +87,8 @@ public class TestAsyncReaderLock extends TestDistributedLogBase { Utils.close(reader1); // simulate a old stream created without readlock path - writer.bkDistributedLogManager.getWriterZKC().get().delete(readLockPath, -1); + NamespaceDriver driver = dlm.getNamespaceDriver(); + ((BKNamespaceDriver) driver).getWriterZKC().get().delete(readLockPath, -1); Future<AsyncLogReader> futureReader2 = dlm.getAsyncLogReaderWithLock(DLSN.InitialDLSN); AsyncLogReader reader2 = Await.result(futureReader2); record = Await.result(reader2.readNext()); @@ -230,7 +233,7 @@ public class TestAsyncReaderLock extends TestDistributedLogBase { DistributedLogManager dlm1 = createNewDLM(conf, name); Future<AsyncLogReader> futureReader1 = dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN); - Await.result(futureReader1); + AsyncLogReader reader1 = Await.result(futureReader1); BKDistributedLogManager dlm2 = (BKDistributedLogManager) createNewDLM(conf, name); Future<AsyncLogReader> futureReader2 = dlm2.getAsyncLogReaderWithLock(DLSN.InitialDLSN); @@ -243,6 +246,7 @@ public class TestAsyncReaderLock extends TestDistributedLogBase { } catch (LockCancelledException ex) { } + Utils.close(reader1); dlm0.close(); dlm1.close(); } @@ -250,16 +254,26 @@ public class TestAsyncReaderLock extends TestDistributedLogBase { @Test(timeout = 60000) public void testReaderLockSessionExpires() throws Exception { String name = runtime.getMethodName(); - DistributedLogManager dlm0 = createNewDLM(conf, name); + URI uri = createDLMURI("/" + name); + ensureURICreated(uri); + DistributedLogNamespace ns0 = DistributedLogNamespaceBuilder.newBuilder() + .conf(conf) + .uri(uri) + .build(); + DistributedLogManager dlm0 = ns0.openLog(name); BKAsyncLogWriter writer = (BKAsyncLogWriter)(dlm0.startAsyncLogSegmentNonPartitioned()); writer.write(DLMTestUtil.getLogRecordInstance(1L)); writer.write(DLMTestUtil.getLogRecordInstance(2L)); writer.closeAndComplete(); - DistributedLogManager dlm1 = createNewDLM(conf, name); + DistributedLogNamespace ns1 = DistributedLogNamespaceBuilder.newBuilder() + .conf(conf) + .uri(uri) + .build(); + DistributedLogManager dlm1 = ns1.openLog(name); Future<AsyncLogReader> futureReader1 = dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN); AsyncLogReader reader1 = Await.result(futureReader1); - ZooKeeperClientUtils.expireSession(((BKDistributedLogManager)dlm1).getWriterZKC(), zkServers, 1000); + ZooKeeperClientUtils.expireSession(((BKNamespaceDriver) ns1.getNamespaceDriver()).getWriterZKC(), zkServers, 1000); // The result of expireSession is somewhat non-deterministic with this lock. // It may fail with LockingException or it may succesfully reacquire, so for @@ -276,7 +290,9 @@ public class TestAsyncReaderLock extends TestDistributedLogBase { Utils.close(reader1); dlm0.close(); + ns0.close(); dlm1.close(); + ns1.close(); } @Test(timeout = 60000) @@ -511,8 +527,11 @@ public class TestAsyncReaderLock extends TestDistributedLogBase { Utils.close(Await.result(futureReader3)); dlm1.close(); + namespace1.close(); dlm2.close(); + namespace2.close(); dlm3.close(); + namespace3.close(); executorService.shutdown(); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java index 41adbb9..46c8523 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java @@ -36,6 +36,7 @@ import com.twitter.distributedlog.config.ConcurrentConstConfiguration; import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; import com.twitter.distributedlog.exceptions.BKTransmitException; import com.twitter.distributedlog.exceptions.LockingException; +import com.twitter.distributedlog.impl.BKNamespaceDriver; import com.twitter.distributedlog.io.CompressionCodec; import com.twitter.distributedlog.util.Utils; import com.twitter.util.Promise; @@ -1270,8 +1271,9 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { BKLogSegmentWriter logWriter = writer.getCachedLogWriter(); + BKNamespaceDriver driver = (BKNamespaceDriver) dlm.getNamespaceDriver(); // fence the ledger - dlm.getWriterBKC().get().openLedger(logWriter.getLogSegmentId(), + driver.getReaderBKC().get().openLedger(logWriter.getLogSegmentId(), BookKeeper.DigestType.CRC32, confLocal.getBKDigestPW().getBytes(UTF_8)); try { @@ -1313,8 +1315,9 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { BKLogSegmentWriter logWriter = writer.getCachedLogWriter(); + BKNamespaceDriver driver = (BKNamespaceDriver) dlm.getNamespaceDriver(); // fence the ledger - dlm.getWriterBKC().get().openLedger(logWriter.getLogSegmentId(), + driver.getReaderBKC().get().openLedger(logWriter.getLogSegmentId(), BookKeeper.DigestType.CRC32, confLocal.getBKDigestPW().getBytes(UTF_8)); try { @@ -1500,6 +1503,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { confLocal.setImmediateFlushEnabled(true); confLocal.setReadAheadBatchSize(1); confLocal.setReadAheadMaxRecords(1); + confLocal.setReadLACLongPollTimeout(49); confLocal.setReaderIdleWarnThresholdMillis(100); confLocal.setReaderIdleErrorThresholdMillis(20000); final DistributedLogManager dlm = createNewDLM(confLocal, name); @@ -1976,7 +1980,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { List<LogSegmentMetadata> segments = dlm.getLogSegments(); assertEquals(1, segments.size()); long ledgerId = segments.get(0).getLogSegmentId(); - LedgerHandle lh = ((BKDistributedLogNamespace) namespace).getReaderBKC() + LedgerHandle lh = ((BKNamespaceDriver) namespace.getNamespaceDriver()).getReaderBKC() .get().openLedgerNoRecovery(ledgerId, BookKeeper.DigestType.CRC32, confLocal.getBKDigestPW().getBytes(UTF_8)); LedgerMetadata metadata = BookKeeperAccessor.getLedgerMetadata(lh); assertEquals(DistributedLogConfiguration.BKDL_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT, metadata.getEnsembleSize()); @@ -1995,7 +1999,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { segments = dlm.getLogSegments(); assertEquals(1, segments.size()); ledgerId = segments.get(0).getLogSegmentId(); - lh = ((BKDistributedLogNamespace) namespace).getReaderBKC() + lh = ((BKNamespaceDriver) namespace.getNamespaceDriver()).getReaderBKC() .get().openLedgerNoRecovery(ledgerId, BookKeeper.DigestType.CRC32, confLocal.getBKDigestPW().getBytes(UTF_8)); metadata = BookKeeperAccessor.getLedgerMetadata(lh); assertEquals(DistributedLogConfiguration.BKDL_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT - 1, metadata.getEnsembleSize()); @@ -2147,6 +2151,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { confLocal.setImmediateFlushEnabled(false); confLocal.setPeriodicFlushFrequencyMilliSeconds(0); confLocal.setPeriodicKeepAliveMilliSeconds(0); + confLocal.setReadLACLongPollTimeout(9); confLocal.setReaderIdleWarnThresholdMillis(20); confLocal.setReaderIdleErrorThresholdMillis(40); @@ -2178,6 +2183,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { confLocal.setImmediateFlushEnabled(false); confLocal.setPeriodicFlushFrequencyMilliSeconds(0); confLocal.setPeriodicKeepAliveMilliSeconds(1000); + confLocal.setReadLACLongPollTimeout(999); confLocal.setReaderIdleWarnThresholdMillis(2000); confLocal.setReaderIdleErrorThresholdMillis(4000); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java index 96c33e2..f7d587d 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -32,6 +33,7 @@ import com.twitter.distributedlog.exceptions.LogEmptyException; import com.twitter.distributedlog.exceptions.LogNotFoundException; import com.twitter.distributedlog.exceptions.LogReadException; import com.twitter.distributedlog.impl.ZKLogSegmentMetadataStore; +import com.twitter.distributedlog.io.Abortables; import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore; import com.twitter.distributedlog.util.FutureUtils; import com.twitter.distributedlog.util.OrderedScheduler; @@ -48,14 +50,12 @@ import com.twitter.distributedlog.callback.LogSegmentListener; import com.twitter.distributedlog.exceptions.EndOfStreamException; import com.twitter.distributedlog.exceptions.InvalidStreamNameException; import com.twitter.distributedlog.exceptions.LogRecordTooLongException; -import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException; import com.twitter.distributedlog.exceptions.TransactionIdOutOfOrderException; import com.twitter.distributedlog.metadata.LogMetadata; import com.twitter.distributedlog.metadata.MetadataUpdater; import com.twitter.distributedlog.metadata.LogSegmentMetadataStoreUpdater; import com.twitter.distributedlog.namespace.DistributedLogNamespace; import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder; -import com.twitter.distributedlog.subscription.SubscriptionStateStore; import com.twitter.distributedlog.subscription.SubscriptionsStore; import com.twitter.util.Await; import com.twitter.util.Duration; @@ -67,6 +67,8 @@ import static org.junit.Assert.assertEquals; public class TestBKDistributedLogManager extends TestDistributedLogBase { static final Logger LOG = LoggerFactory.getLogger(TestBKDistributedLogManager.class); + private static final Random RAND = new Random(System.currentTimeMillis()); + @Rule public TestName testNames = new TestName(); @@ -254,20 +256,6 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { } @Test(timeout = 60000) - public void testTwoWriters() throws Exception { - DLMTestUtil.BKLogPartitionWriteHandlerAndClients bkdlm1 = - createNewBKDLM(conf, "distrlog-dualWriter"); - try { - createNewBKDLM(conf, "distrlog-dualWriter"); - fail("Shouldn't have been able to open the second writer"); - } catch (OwnershipAcquireFailedException ioe) { - assertEquals(ioe.getCurrentOwner(), DistributedLogConstants.UNKNOWN_CLIENT_ID); - } - - bkdlm1.close(); - } - - @Test(timeout = 60000) public void testTwoWritersOnLockDisabled() throws Exception { DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); confLocal.addConfiguration(conf); @@ -468,11 +456,10 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { writer.setReadyToFlush(); writer.flushAndSync(); writer.close(); - dlm.createOrUpdateMetadata(name.getBytes()); - assertEquals(name, new String(dlm.getMetadata())); + dlm.close(); URI uri = createDLMURI("/" + name); - BKDistributedLogNamespace namespace = BKDistributedLogNamespace.newBuilder() + DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() .conf(conf).uri(uri).build(); assertTrue(namespace.logExists(name)); assertFalse(namespace.logExists("non-existent-log")); @@ -490,9 +477,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { } assertEquals(1, logCount); - for(Map.Entry<String, byte[]> logEntry: namespace.enumerateLogsWithMetadataInNamespace().entrySet()) { - assertEquals(name, new String(logEntry.getValue())); - } + namespace.close(); } @Test(timeout = 60000) @@ -507,28 +492,6 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { } @Test(timeout = 60000) - @Deprecated - public void testSubscriptionStateStore() throws Exception { - String name = "distrlog-subscription-state"; - String subscriberId = "defaultSubscriber"; - DLSN commitPosition0 = new DLSN(4, 33, 5); - DLSN commitPosition1 = new DLSN(4, 34, 5); - DLSN commitPosition2 = new DLSN(5, 34, 5); - - DistributedLogManager dlm = createNewDLM(conf, name); - SubscriptionStateStore store = dlm.getSubscriptionStateStore(subscriberId); - assertEquals(Await.result(store.getLastCommitPosition()), DLSN.NonInclusiveLowerBound); - Await.result(store.advanceCommitPosition(commitPosition1)); - assertEquals(Await.result(store.getLastCommitPosition()), commitPosition1); - Await.result(store.advanceCommitPosition(commitPosition0)); - assertEquals(Await.result(store.getLastCommitPosition()), commitPosition1); - Await.result(store.advanceCommitPosition(commitPosition2)); - assertEquals(Await.result(store.getLastCommitPosition()), commitPosition2); - SubscriptionStateStore store1 = dlm.getSubscriptionStateStore(subscriberId); - assertEquals(Await.result(store1.getLastCommitPosition()), commitPosition2); - } - - @Test(timeout = 60000) public void testSubscriptionsStore() throws Exception { String name = "distrlog-subscriptions-store"; String subscriber0 = "subscriber-0"; @@ -732,24 +695,12 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { reader.close(); } - @Test(timeout = 60000) + @Test(timeout = 60000, expected = LogRecordTooLongException.class) public void testMaxLogRecSize() throws Exception { - DLMTestUtil.BKLogPartitionWriteHandlerAndClients bkdlmAndClients = - createNewBKDLM(conf, "distrlog-maxlogRecSize"); - long txid = 1; - BKLogSegmentWriter out = bkdlmAndClients.getWriteHandler().startLogSegment(1); - boolean exceptionEncountered = false; - try { - LogRecord op = new LogRecord(txid, DLMTestUtil.repeatString( - DLMTestUtil.repeatString("abcdefgh", 256), 512).getBytes()); - out.write(op); - } catch (LogRecordTooLongException exc) { - exceptionEncountered = true; - } finally { - FutureUtils.result(out.asyncClose()); - } - bkdlmAndClients.close(); - assertTrue(exceptionEncountered); + DistributedLogManager dlm = createNewDLM(conf, "distrlog-maxlogRecSize"); + AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter()); + FutureUtils.result(writer.write(new LogRecord(1L, DLMTestUtil.repeatString( + DLMTestUtil.repeatString("abcdefgh", 256), 512).getBytes()))); } @Test(timeout = 60000) @@ -757,25 +708,27 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); confLocal.loadConf(conf); confLocal.setOutputBufferSize(1024 * 1024); - DLMTestUtil.BKLogPartitionWriteHandlerAndClients bkdlmAndClients = - createNewBKDLM(confLocal, "distrlog-transmissionSize"); - long txid = 1; - BKLogSegmentWriter out = bkdlmAndClients.getWriteHandler().startLogSegment(1); + BKDistributedLogManager dlm = + createNewDLM(confLocal, "distrlog-transmissionSize"); + AsyncLogWriter out = FutureUtils.result(dlm.openAsyncLogWriter()); boolean exceptionEncountered = false; - byte[] largePayload = DLMTestUtil.repeatString(DLMTestUtil.repeatString("abcdefgh", 256), 256).getBytes(); + byte[] largePayload = new byte[(LogRecord.MAX_LOGRECORDSET_SIZE / 2) + 2]; + RAND.nextBytes(largePayload); try { - while (txid < 3) { - LogRecord op = new LogRecord(txid, largePayload); - out.write(op); - txid++; - } + LogRecord op = new LogRecord(1L, largePayload); + Future<DLSN> firstWriteFuture = out.write(op); + op = new LogRecord(2L, largePayload); + // the second write will flush the first one, since we reached the maximum transmission size. + out.write(op); + FutureUtils.result(firstWriteFuture); } catch (LogRecordTooLongException exc) { exceptionEncountered = true; } finally { FutureUtils.result(out.asyncClose()); } - bkdlmAndClients.close(); - assertTrue(!exceptionEncountered); + assertFalse(exceptionEncountered); + Abortables.abortQuietly(out); + dlm.close(); } @Test(timeout = 60000) http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogNamespace.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogNamespace.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogNamespace.java index ecc20e0..a8a82fa 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogNamespace.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogNamespace.java @@ -34,9 +34,10 @@ import com.twitter.distributedlog.exceptions.AlreadyClosedException; import com.twitter.distributedlog.exceptions.InvalidStreamNameException; import com.twitter.distributedlog.exceptions.LockingException; import com.twitter.distributedlog.exceptions.ZKException; -import com.twitter.distributedlog.impl.BKDLUtils; +import com.twitter.distributedlog.impl.BKNamespaceDriver; import com.twitter.distributedlog.namespace.DistributedLogNamespace; import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder; +import com.twitter.distributedlog.util.DLUtils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; @@ -112,63 +113,13 @@ public class TestBKDistributedLogNamespace extends TestDistributedLogBase { } @Test(timeout = 60000) - @SuppressWarnings("deprecation") - public void testClientSharingOptions() throws Exception { - URI uri = createDLMURI("/clientSharingOptions"); - BKDistributedLogNamespace namespace = BKDistributedLogNamespace.newBuilder() - .conf(conf).uri(uri).build(); - - { - BKDistributedLogManager bkdlm1 = (BKDistributedLogManager)namespace.createDistributedLogManager("perstream1", - DistributedLogManagerFactory.ClientSharingOption.PerStreamClients); - - BKDistributedLogManager bkdlm2 = (BKDistributedLogManager)namespace.createDistributedLogManager("perstream2", - DistributedLogManagerFactory.ClientSharingOption.PerStreamClients); - - assertThat(bkdlm1.getReaderBKC(), not(bkdlm2.getReaderBKC())); - assertThat(bkdlm1.getWriterBKC(), not(bkdlm2.getWriterBKC())); - assertThat(bkdlm1.getReaderZKC(), not(bkdlm2.getReaderZKC())); - assertThat(bkdlm1.getWriterZKC(), not(bkdlm2.getWriterZKC())); - - } - - { - BKDistributedLogManager bkdlm1 = (BKDistributedLogManager)namespace.createDistributedLogManager("sharedZK1", - DistributedLogManagerFactory.ClientSharingOption.SharedZKClientPerStreamBKClient); - - BKDistributedLogManager bkdlm2 = (BKDistributedLogManager)namespace.createDistributedLogManager("sharedZK2", - DistributedLogManagerFactory.ClientSharingOption.SharedZKClientPerStreamBKClient); - - assertThat(bkdlm1.getReaderBKC(), not(bkdlm2.getReaderBKC())); - assertThat(bkdlm1.getWriterBKC(), not(bkdlm2.getWriterBKC())); - assertEquals(bkdlm1.getReaderZKC(), bkdlm2.getReaderZKC()); - assertEquals(bkdlm1.getWriterZKC(), bkdlm2.getWriterZKC()); - } - - { - BKDistributedLogManager bkdlm1 = (BKDistributedLogManager)namespace.createDistributedLogManager("sharedBoth1", - DistributedLogManagerFactory.ClientSharingOption.SharedClients); - - BKDistributedLogManager bkdlm2 = (BKDistributedLogManager)namespace.createDistributedLogManager("sharedBoth2", - DistributedLogManagerFactory.ClientSharingOption.SharedClients); - - assertEquals(bkdlm1.getReaderBKC(), bkdlm2.getReaderBKC()); - assertEquals(bkdlm1.getWriterBKC(), bkdlm2.getWriterBKC()); - assertEquals(bkdlm1.getReaderZKC(), bkdlm2.getReaderZKC()); - assertEquals(bkdlm1.getWriterZKC(), bkdlm2.getWriterZKC()); - } - - } - - - @Test(timeout = 60000) public void testInvalidStreamName() throws Exception { - assertFalse(BKDLUtils.isReservedStreamName("test")); - assertTrue(BKDLUtils.isReservedStreamName(".test")); + assertFalse(DLUtils.isReservedStreamName("test")); + assertTrue(DLUtils.isReservedStreamName(".test")); URI uri = createDLMURI("/" + runtime.getMethodName()); - BKDistributedLogNamespace namespace = BKDistributedLogNamespace.newBuilder() + DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() .conf(conf).uri(uri).build(); try { @@ -238,11 +189,6 @@ public class TestBKDistributedLogNamespace extends TestDistributedLogBase { assertTrue(streamSet.contains("test1")); assertTrue(streamSet.contains("test_2-3")); - Map<String, byte[]> streamMetadatas = namespace.enumerateLogsWithMetadataInNamespace(); - assertEquals(2, streamMetadatas.size()); - assertTrue(streamMetadatas.containsKey("test1")); - assertTrue(streamMetadatas.containsKey("test_2-3")); - namespace.close(); } @@ -385,7 +331,7 @@ public class TestBKDistributedLogNamespace extends TestDistributedLogBase { static void validateBadAllocatorConfiguration(DistributedLogConfiguration conf, URI uri) throws Exception { try { - BKDistributedLogNamespace.validateAndGetFullLedgerAllocatorPoolPath(conf, uri); + BKNamespaceDriver.validateAndGetFullLedgerAllocatorPoolPath(conf, uri); fail("Should throw exception when bad allocator configuration provided"); } catch (IOException ioe) { // expected @@ -425,7 +371,7 @@ public class TestBKDistributedLogNamespace extends TestDistributedLogBase { @Test(timeout = 60000) public void testUseNamespaceAfterCloseShouldFailFast() throws Exception { URI uri = createDLMURI("/" + runtime.getMethodName()); - BKDistributedLogNamespace namespace = BKDistributedLogNamespace.newBuilder() + DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() .conf(conf) .uri(uri) .build(); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogReadHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogReadHandler.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogReadHandler.java index 4b17500..854cb74 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogReadHandler.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogReadHandler.java @@ -50,27 +50,6 @@ public class TestBKLogReadHandler extends TestDistributedLogBase { @Rule public TestName runtime = new TestName(); - private void prepareLogSegments(String name, int numSegments, int numEntriesPerSegment) throws Exception { - DLMTestUtil.BKLogPartitionWriteHandlerAndClients bkdlmAndClients = createNewBKDLM(conf, name); - long txid = 1; - for (int sid = 0; sid < numSegments; ++sid) { - BKLogSegmentWriter out = bkdlmAndClients.getWriteHandler().startLogSegment(txid); - for (int eid = 0; eid < numEntriesPerSegment; ++eid) { - LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid); - out.write(record); - ++txid; - } - FutureUtils.result(out.asyncClose()); - bkdlmAndClients.getWriteHandler().completeAndCloseLogSegment( - out.getLogSegmentSequenceNumber(), - out.getLogSegmentId(), - 1 + sid * numEntriesPerSegment, - (sid + 1) * numEntriesPerSegment, - numEntriesPerSegment); - } - bkdlmAndClients.close(); - } - private void prepareLogSegmentsNonPartitioned(String name, int numSegments, int numEntriesPerSegment) throws Exception { DistributedLogManager dlm = createNewDLM(conf, name); long txid = 1; @@ -134,8 +113,8 @@ public class TestBKLogReadHandler extends TestDistributedLogBase { @Test(timeout = 60000) public void testGetFirstDLSNWithLogSegments() throws Exception { String dlName = runtime.getMethodName(); - prepareLogSegments(dlName, 3, 3); BKDistributedLogManager dlm = createNewDLM(conf, dlName); + DLMTestUtil.generateCompletedLogSegments(dlm, conf, 3, 3); BKLogReadHandler readHandler = dlm.createReadHandler(); Future<LogRecordWithDLSN> futureRecord = readHandler.asyncGetFirstLogRecord(); try { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogSegmentWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogSegmentWriter.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogSegmentWriter.java index b350255..8f86192 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogSegmentWriter.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogSegmentWriter.java @@ -21,14 +21,14 @@ import com.twitter.distributedlog.exceptions.BKTransmitException; import com.twitter.distributedlog.exceptions.EndOfStreamException; import com.twitter.distributedlog.exceptions.WriteCancelledException; import com.twitter.distributedlog.exceptions.WriteException; +import com.twitter.distributedlog.impl.BKNamespaceDriver; import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryWriter; import com.twitter.distributedlog.io.Abortables; import com.twitter.distributedlog.lock.SessionLockFactory; import com.twitter.distributedlog.lock.ZKDistributedLock; import com.twitter.distributedlog.lock.ZKSessionLockFactory; -import com.twitter.distributedlog.metadata.BKDLConfig; +import com.twitter.distributedlog.impl.metadata.BKDLConfig; import com.twitter.distributedlog.util.ConfUtils; -import com.twitter.distributedlog.util.DLUtils; import com.twitter.distributedlog.util.FutureUtils; import com.twitter.distributedlog.util.OrderedScheduler; import com.twitter.distributedlog.util.PermitLimiter; @@ -96,7 +96,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { .dlConfig(conf) .name("test-bkc") .ledgersPath(bkdlConfig.getBkLedgersPath()) - .zkServers(DLUtils.getZKServersFromDLUri(uri)) + .zkServers(BKNamespaceDriver.getZKServersFromDLUri(uri)) .build(); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogWriteHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogWriteHandler.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogWriteHandler.java index 754f945..a0485bd 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogWriteHandler.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogWriteHandler.java @@ -19,6 +19,7 @@ package com.twitter.distributedlog; import com.twitter.distributedlog.bk.LedgerAllocator; import com.twitter.distributedlog.bk.LedgerAllocatorPool; +import com.twitter.distributedlog.impl.BKNamespaceDriver; import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder; import com.twitter.distributedlog.util.FailpointUtils; import com.twitter.distributedlog.util.FutureUtils; @@ -75,7 +76,8 @@ public class TestBKLogWriteHandler extends TestDistributedLogBase { FailpointUtils.FailPointName.FP_StartLogSegmentOnAssignLogSegmentSequenceNumber); } - LedgerAllocator allocator = namespace.getLedgerAllocator(); + LedgerAllocator allocator = ((BKNamespaceDriver) namespace.getNamespaceDriver()) + .getLedgerAllocator(); assertTrue(allocator instanceof LedgerAllocatorPool); LedgerAllocatorPool allocatorPool = (LedgerAllocatorPool) allocator; assertEquals(0, allocatorPool.obtainMapSize()); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java index a388b68..d850db4 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java @@ -19,11 +19,24 @@ package com.twitter.distributedlog; import static org.junit.Assert.assertTrue; +import com.google.common.base.Optional; +import com.google.common.base.Ticker; +import com.twitter.distributedlog.impl.BKNamespaceDriver; import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryWriter; +import com.twitter.distributedlog.injector.AsyncFailureInjector; +import com.twitter.distributedlog.injector.AsyncRandomFailureInjector; +import com.twitter.distributedlog.io.AsyncCloseable; import com.twitter.distributedlog.logsegment.LogSegmentEntryWriter; +import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache; import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore; import com.twitter.distributedlog.namespace.DistributedLogNamespace; +import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder; +import com.twitter.distributedlog.namespace.NamespaceDriver; +import com.twitter.distributedlog.util.ConfUtils; +import com.twitter.distributedlog.util.OrderedScheduler; import com.twitter.distributedlog.util.PermitLimiter; +import com.twitter.distributedlog.util.SchedulerUtils; +import com.twitter.util.Future; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.feature.SettableFeatureProvider; import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim; @@ -43,9 +56,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; public class TestDistributedLogBase { static final Logger LOG = LoggerFactory.getLogger(TestDistributedLogBase.class); @@ -87,6 +102,12 @@ public class TestDistributedLogBase { .build(); bkutil.start(); zkServers = "127.0.0.1:" + zkPort; + Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + LOG.warn("Uncaught exception at Thread {} : ", t.getName(), e); + } + }); } @AfterClass @@ -141,22 +162,7 @@ public class TestDistributedLogBase { public BKDistributedLogManager createNewDLM(DistributedLogConfiguration conf, String name) throws Exception { - URI uri = createDLMURI("/" + name); - ensureURICreated(uri); - return new BKDistributedLogManager( - name, - conf, - uri, - null, - null, - null, - null, - null, - null, - new SettableFeatureProvider("", 0), - PermitLimiter.NULL_PERMIT_LIMITER, - NullStatsLogger.INSTANCE - ); + return createNewDLM(conf, name, PermitLimiter.NULL_PERMIT_LIMITER); } public BKDistributedLogManager createNewDLM(DistributedLogConfiguration conf, @@ -165,48 +171,69 @@ public class TestDistributedLogBase { throws Exception { URI uri = createDLMURI("/" + name); ensureURICreated(uri); + final DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() + .uri(uri) + .conf(conf) + .build(); + final OrderedScheduler scheduler = OrderedScheduler.newBuilder() + .corePoolSize(1) + .name("test-scheduler") + .build(); + AsyncCloseable resourcesCloseable = new AsyncCloseable() { + @Override + public Future<Void> asyncClose() { + LOG.info("Shutting down the scheduler"); + SchedulerUtils.shutdownScheduler(scheduler, 1, TimeUnit.SECONDS); + LOG.info("Shut down the scheduler"); + LOG.info("Closing the namespace"); + namespace.close(); + LOG.info("Closed the namespace"); + return Future.Void(); + } + }; + AsyncFailureInjector failureInjector = AsyncRandomFailureInjector.newBuilder() + .injectDelays(conf.getEIInjectReadAheadDelay(), + conf.getEIInjectReadAheadDelayPercent(), + conf.getEIInjectMaxReadAheadDelayMs()) + .injectErrors(false, 10) + .injectStops(conf.getEIInjectReadAheadStall(), 10) + .injectCorruption(conf.getEIInjectReadAheadBrokenEntries()) + .build(); return new BKDistributedLogManager( name, conf, + ConfUtils.getConstDynConf(conf), uri, - null, - null, - null, - null, - null, - null, - new SettableFeatureProvider("", 0), + namespace.getNamespaceDriver(), + new LogSegmentMetadataCache(conf, Ticker.systemTicker()), + scheduler, + DistributedLogConstants.UNKNOWN_CLIENT_ID, + DistributedLogConstants.LOCAL_REGION_ID, writeLimiter, - NullStatsLogger.INSTANCE - ); - } - - public DLMTestUtil.BKLogPartitionWriteHandlerAndClients createNewBKDLM( - DistributedLogConfiguration conf, - String path) throws Exception { - return DLMTestUtil.createNewBKDLM(conf, path, zkPort); + new SettableFeatureProvider("", 0), + failureInjector, + NullStatsLogger.INSTANCE, + NullStatsLogger.INSTANCE, + Optional.of(resourcesCloseable)); } - @SuppressWarnings("deprecation") - protected LogSegmentMetadataStore getLogSegmentMetadataStore(DistributedLogManagerFactory factory) { - DistributedLogNamespace namespace = factory.getNamespace(); - assertTrue(namespace instanceof BKDistributedLogNamespace); - return ((BKDistributedLogNamespace) namespace).getWriterStreamMetadataStore() + protected LogSegmentMetadataStore getLogSegmentMetadataStore(DistributedLogNamespace namespace) + throws IOException { + return namespace.getNamespaceDriver().getLogStreamMetadataStore(NamespaceDriver.Role.READER) .getLogSegmentMetadataStore(); } - @SuppressWarnings("deprecation") - protected ZooKeeperClient getZooKeeperClient(DistributedLogManagerFactory factory) throws Exception { - DistributedLogNamespace namespace = factory.getNamespace(); - assertTrue(namespace instanceof BKDistributedLogNamespace); - return ((BKDistributedLogNamespace) namespace).getSharedWriterZKCForDL(); + protected ZooKeeperClient getZooKeeperClient(DistributedLogNamespace namespace) throws Exception { + NamespaceDriver driver = namespace.getNamespaceDriver(); + assertTrue(driver instanceof BKNamespaceDriver); + return ((BKNamespaceDriver) driver).getWriterZKC(); } @SuppressWarnings("deprecation") - protected BookKeeperClient getBookKeeperClient(DistributedLogManagerFactory factory) throws Exception { - DistributedLogNamespace namespace = factory.getNamespace(); - assertTrue(namespace instanceof BKDistributedLogNamespace); - return ((BKDistributedLogNamespace) namespace).getReaderBKC(); + protected BookKeeperClient getBookKeeperClient(DistributedLogNamespace namespace) throws Exception { + NamespaceDriver driver = namespace.getNamespaceDriver(); + assertTrue(driver instanceof BKNamespaceDriver); + return ((BKNamespaceDriver) driver).getReaderBKC(); } protected LedgerHandle getLedgerHandle(BKLogSegmentWriter segmentWriter) { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/TestFailureAndRecovery.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestFailureAndRecovery.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestFailureAndRecovery.java deleted file mode 100644 index e86e45a..0000000 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestFailureAndRecovery.java +++ /dev/null @@ -1,257 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import com.twitter.distributedlog.exceptions.BKTransmitException; -import com.twitter.distributedlog.io.Abortables; -import com.twitter.distributedlog.util.FutureUtils; -import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.proto.BookieServer; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.junit.Test; - -import static org.junit.Assert.*; - -public class TestFailureAndRecovery extends TestDistributedLogBase { - static final Log LOG = LogFactory.getLog(TestFailureAndRecovery.class); - - @Test(timeout = 60000) - public void testSimpleRecovery() throws Exception { - DLMTestUtil.BKLogPartitionWriteHandlerAndClients bkdlmAndClients = createNewBKDLM(conf, "distrlog-simplerecovery"); - BKLogSegmentWriter out = bkdlmAndClients.getWriteHandler().startLogSegment(1); - long txid = 1; - for (long i = 1; i <= 100; i++) { - LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); - out.write(op); - if ((i % 10) == 0) { - FutureUtils.result(out.flushAndCommit()); - } - - } - FutureUtils.result(out.flushAndCommit()); - - Abortables.abort(out, false); - FutureUtils.result(out.asyncClose()); - - assertNull(zkc.exists(bkdlmAndClients.getWriteHandler().completedLedgerZNode(1, 100, out.getLogSegmentSequenceNumber()), false)); - assertNotNull(zkc.exists(bkdlmAndClients.getWriteHandler().inprogressZNode(out.getLogSegmentId(), 1, out.getLogSegmentSequenceNumber()), false)); - - FutureUtils.result(bkdlmAndClients.getWriteHandler().recoverIncompleteLogSegments()); - - assertNotNull(zkc.exists(bkdlmAndClients.getWriteHandler().completedLedgerZNode(1, 100, out.getLogSegmentSequenceNumber()), false)); - assertNull(zkc.exists(bkdlmAndClients.getWriteHandler().inprogressZNode(out.getLogSegmentId(), 1, out.getLogSegmentSequenceNumber()), false)); - } - - /** - * Test that if enough bookies fail to prevent an ensemble, - * writes the bookkeeper will fail. Test that when once again - * an ensemble is available, it can continue to write. - */ - @Test(timeout = 60000) - public void testAllBookieFailure() throws Exception { - BookieServer bookieToFail = bkutil.newBookie(); - BookieServer replacementBookie = null; - - try { - int ensembleSize = numBookies + 1; - assertEquals("Begin: New bookie didn't start", - ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10)); - - // ensure that the journal manager has to use all bookies, - // so that a failure will fail the journal manager - DistributedLogConfiguration conf = new DistributedLogConfiguration(); - conf.setEnsembleSize(ensembleSize); - conf.setWriteQuorumSize(ensembleSize); - conf.setAckQuorumSize(ensembleSize); - long txid = 1; - DLMTestUtil.BKLogPartitionWriteHandlerAndClients bkdlmAndClients = createNewBKDLM(conf, "distrlog-allbookiefailure"); - BKLogSegmentWriter out = bkdlmAndClients.getWriteHandler().startLogSegment(txid); - - for (long i = 1; i <= 3; i++) { - LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); - out.write(op); - } - FutureUtils.result(out.flushAndCommit()); - bookieToFail.shutdown(); - assertEquals("New bookie didn't die", - numBookies, bkutil.checkBookiesUp(numBookies, 10)); - - try { - for (long i = 1; i <= 3; i++) { - LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); - out.write(op); - txid++; - } - FutureUtils.result(out.flushAndCommit()); - fail("should not get to this stage"); - } catch (BKTransmitException bkte) { - LOG.debug("Error writing to bookkeeper", bkte); - assertEquals("Invalid exception message", - BKException.Code.NotEnoughBookiesException, bkte.getBKResultCode()); - } - replacementBookie = bkutil.newBookie(); - - assertEquals("Replacement: New bookie didn't start", - numBookies + 1, bkutil.checkBookiesUp(numBookies + 1, 10)); - out = bkdlmAndClients.getWriteHandler().startLogSegment(txid); - for (long i = 1; i <= 3; i++) { - LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); - out.write(op); - } - - FutureUtils.result(out.flushAndCommit()); - } catch (Exception e) { - LOG.error("Exception in test", e); - throw e; - } finally { - if (replacementBookie != null) { - replacementBookie.shutdown(); - } - bookieToFail.shutdown(); - - if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) { - LOG.warn("Not all bookies from this test shut down, expect errors"); - } - } - } - - /** - * Test that a BookKeeper JM can continue to work across the - * failure of a bookie. This should be handled transparently - * by bookkeeper. - */ - @Test(timeout = 60000) - public void testOneBookieFailure() throws Exception { - BookieServer bookieToFail = bkutil.newBookie(); - BookieServer replacementBookie = null; - - try { - int ensembleSize = numBookies + 1; - assertEquals("New bookie didn't start", - ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10)); - - // ensure that the journal manager has to use all bookies, - // so that a failure will fail the journal manager - DistributedLogConfiguration conf = new DistributedLogConfiguration(); - conf.setEnsembleSize(ensembleSize); - conf.setWriteQuorumSize(ensembleSize); - conf.setAckQuorumSize(ensembleSize); - long txid = 1; - DLMTestUtil.BKLogPartitionWriteHandlerAndClients bkdlmAndClients = createNewBKDLM(conf, "distrlog-onebookiefailure"); - BKLogSegmentWriter out = bkdlmAndClients.getWriteHandler().startLogSegment(txid); - for (long i = 1; i <= 3; i++) { - LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); - out.write(op); - } - FutureUtils.result(out.flushAndCommit()); - - replacementBookie = bkutil.newBookie(); - assertEquals("replacement bookie didn't start", - ensembleSize + 1, bkutil.checkBookiesUp(ensembleSize + 1, 10)); - bookieToFail.shutdown(); - assertEquals("New bookie didn't die", - ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10)); - - for (long i = 1; i <= 3; i++) { - LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); - out.write(op); - } - FutureUtils.result(out.flushAndCommit()); - } catch (Exception e) { - LOG.error("Exception in test", e); - throw e; - } finally { - if (replacementBookie != null) { - replacementBookie.shutdown(); - } - bookieToFail.shutdown(); - - if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) { - LOG.warn("Not all bookies from this test shut down, expect errors"); - } - } - } - - @Test(timeout = 60000) - public void testRecoveryEmptyLedger() throws Exception { - DLMTestUtil.BKLogPartitionWriteHandlerAndClients bkdlmAndClients = createNewBKDLM(conf, "distrlog-recovery-empty-ledger"); - BKLogSegmentWriter out = bkdlmAndClients.getWriteHandler().startLogSegment(1); - long txid = 1; - for (long i = 1; i <= 100; i++) { - LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); - out.write(op); - if ((i % 10) == 0) { - FutureUtils.result(out.flushAndCommit()); - } - - } - FutureUtils.result(out.flushAndCommit()); - FutureUtils.result(out.asyncClose()); - bkdlmAndClients.getWriteHandler().completeAndCloseLogSegment(out.getLogSegmentSequenceNumber(), out.getLogSegmentId(), 1, 100, 100); - assertNotNull(zkc.exists(bkdlmAndClients.getWriteHandler().completedLedgerZNode(1, 100, out.getLogSegmentSequenceNumber()), false)); - BKLogSegmentWriter outEmpty = bkdlmAndClients.getWriteHandler().startLogSegment(101); - Abortables.abort(outEmpty, false); - - assertNull(zkc.exists(bkdlmAndClients.getWriteHandler().completedLedgerZNode(101, 101, outEmpty.getLogSegmentSequenceNumber()), false)); - assertNotNull(zkc.exists(bkdlmAndClients.getWriteHandler().inprogressZNode(outEmpty.getLogSegmentId(), 101, outEmpty.getLogSegmentSequenceNumber()), false)); - - FutureUtils.result(bkdlmAndClients.getWriteHandler().recoverIncompleteLogSegments()); - - assertNull(zkc.exists(bkdlmAndClients.getWriteHandler().inprogressZNode(outEmpty.getLogSegmentId(), outEmpty.getLogSegmentSequenceNumber(), 101), false)); - assertNotNull(zkc.exists(bkdlmAndClients.getWriteHandler().completedLedgerZNode(101, 101, outEmpty.getLogSegmentSequenceNumber()), false)); - } - - @Test(timeout = 60000) - public void testRecoveryAPI() throws Exception { - DistributedLogManager dlm = createNewDLM(conf, "distrlog-recovery-api"); - BKSyncLogWriter out = (BKSyncLogWriter) dlm.startLogSegmentNonPartitioned(); - long txid = 1; - for (long i = 1; i <= 100; i++) { - LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); - out.write(op); - if ((i % 10) == 0) { - out.setReadyToFlush(); - out.flushAndSync(); - } - - } - BKLogSegmentWriter perStreamLogWriter = out.getCachedLogWriter(); - out.setReadyToFlush(); - out.flushAndSync(); - - out.abort(); - - BKLogWriteHandler blplm1 = ((BKDistributedLogManager) (dlm)).createWriteHandler(true); - - assertNull(zkc.exists(blplm1.completedLedgerZNode(1, 100, - perStreamLogWriter.getLogSegmentSequenceNumber()), false)); - assertNotNull(zkc.exists(blplm1.inprogressZNode(perStreamLogWriter.getLogSegmentId(), 1, - perStreamLogWriter.getLogSegmentSequenceNumber()), false)); - - dlm.recover(); - - assertNotNull(zkc.exists(blplm1.completedLedgerZNode(1, 100, - perStreamLogWriter.getLogSegmentSequenceNumber()), false)); - assertNull(zkc.exists(blplm1.inprogressZNode(perStreamLogWriter.getLogSegmentId(), 1, - perStreamLogWriter.getLogSegmentSequenceNumber()), false)); - FutureUtils.result(blplm1.asyncClose()); - assertEquals(100, dlm.getLogRecordCount()); - dlm.close(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/TestInterleavedReaders.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestInterleavedReaders.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestInterleavedReaders.java index 3cdd676..830e059 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestInterleavedReaders.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestInterleavedReaders.java @@ -22,8 +22,6 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.URI; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -337,72 +335,4 @@ public class TestInterleavedReaders extends TestDistributedLogBase { dlmreader1.close(); } - @Test(timeout = 60000) - public void testFactorySharedClients() throws Exception { - String name = "distrlog-factorysharedclients"; - testFactory(name, true); - } - - @Test(timeout = 60000) - public void testFactorySharedZK() throws Exception { - String name = "distrlog-factorysharedZK"; - testFactory(name, false); - } - - @SuppressWarnings("deprecation") - private void testFactory(String name, boolean shareBK) throws Exception { - int count = 3; - URI uri = createDLMURI("/" + name); - ensureURICreated(uri); - BKDistributedLogNamespace namespace = BKDistributedLogNamespace.newBuilder() - .conf(conf).uri(uri).build(); - DistributedLogManager[] dlms = new DistributedLogManager[count]; - for (int s = 0; s < count; s++) { - if (shareBK) { - dlms[s] = namespace.createDistributedLogManager(name + String.format("%d", s), - DistributedLogManagerFactory.ClientSharingOption.SharedClients); - } else { - dlms[s] = namespace.createDistributedLogManager(name + String.format("%d", s), - DistributedLogManagerFactory.ClientSharingOption.SharedZKClientPerStreamBKClient); - } - } - - int txid = 1; - for (long i = 0; i < 3; i++) { - BKSyncLogWriter[] writers = new BKSyncLogWriter[count]; - for (int s = 0; s < count; s++) { - writers[s] = (BKSyncLogWriter)(dlms[s].startLogSegmentNonPartitioned()); - } - - for (long j = 0; j < 1; j++) { - final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++); - for (int s = 0; s < count; s++) { - writers[s].write(record); - } - } - for (int s = 0; s < count; s++) { - writers[s].closeAndComplete(); - } - - if (i < 2) { - // Restart the zeroth stream and make sure that the other streams can - // continue without restart - dlms[0].close(); - if (shareBK) { - dlms[0] = namespace.createDistributedLogManager(name + String.format("%d", 0), - DistributedLogManagerFactory.ClientSharingOption.SharedClients); - } else { - dlms[0] = namespace.createDistributedLogManager(name + String.format("%d", 0), - DistributedLogManagerFactory.ClientSharingOption.SharedZKClientPerStreamBKClient); - } - } - - } - - for (int s = 0; s < count; s++) { - dlms[s].close(); - } - - namespace.close(); - } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/TestLogSegmentsZK.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestLogSegmentsZK.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestLogSegmentsZK.java index bb67214..06c7bba 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestLogSegmentsZK.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestLogSegmentsZK.java @@ -80,10 +80,10 @@ public class TestLogSegmentsZK extends TestDistributedLogBase { .setImmediateFlushEnabled(true) .setEnableLedgerAllocatorPool(true) .setLedgerAllocatorPoolName("test"); - BKDistributedLogNamespace namespace = BKDistributedLogNamespace.newBuilder().conf(conf).uri(uri).build(); + DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder().conf(conf).uri(uri).build(); namespace.createLog(streamName); - MaxLogSegmentSequenceNo max1 = getMaxLogSegmentSequenceNo(namespace.getSharedWriterZKCForDL(), uri, streamName, conf); + MaxLogSegmentSequenceNo max1 = getMaxLogSegmentSequenceNo(getZooKeeperClient(namespace), uri, streamName, conf); assertEquals(DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO, max1.getSequenceNumber()); DistributedLogManager dlm = namespace.openLog(streamName); final int numSegments = 3; @@ -92,7 +92,7 @@ public class TestLogSegmentsZK extends TestDistributedLogBase { out.write(DLMTestUtil.getLogRecordInstance(i)); out.closeAndComplete(); } - MaxLogSegmentSequenceNo max2 = getMaxLogSegmentSequenceNo(namespace.getSharedWriterZKCForDL(), uri, streamName, conf); + MaxLogSegmentSequenceNo max2 = getMaxLogSegmentSequenceNo(getZooKeeperClient(namespace), uri, streamName, conf); assertEquals(3, max2.getSequenceNumber()); dlm.close(); namespace.close(); @@ -111,10 +111,10 @@ public class TestLogSegmentsZK extends TestDistributedLogBase { .setImmediateFlushEnabled(true) .setEnableLedgerAllocatorPool(true) .setLedgerAllocatorPoolName("test"); - BKDistributedLogNamespace namespace = BKDistributedLogNamespace.newBuilder().conf(conf).uri(uri).build(); + DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder().conf(conf).uri(uri).build(); namespace.createLog(streamName); - MaxLogSegmentSequenceNo max1 = getMaxLogSegmentSequenceNo(namespace.getSharedWriterZKCForDL(), uri, streamName, conf); + MaxLogSegmentSequenceNo max1 = getMaxLogSegmentSequenceNo(getZooKeeperClient(namespace), uri, streamName, conf); assertEquals(DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO, max1.getSequenceNumber()); DistributedLogManager dlm = namespace.openLog(streamName); final int numSegments = 3; @@ -123,11 +123,11 @@ public class TestLogSegmentsZK extends TestDistributedLogBase { out.write(DLMTestUtil.getLogRecordInstance(i)); out.closeAndComplete(); } - MaxLogSegmentSequenceNo max2 = getMaxLogSegmentSequenceNo(namespace.getSharedWriterZKCForDL(), uri, streamName, conf); + MaxLogSegmentSequenceNo max2 = getMaxLogSegmentSequenceNo(getZooKeeperClient(namespace), uri, streamName, conf); assertEquals(3, max2.getSequenceNumber()); // nuke the max ledger sequence number - updateMaxLogSegmentSequenceNo(namespace.getSharedWriterZKCForDL(), uri, streamName, conf, new byte[0]); + updateMaxLogSegmentSequenceNo(getZooKeeperClient(namespace), uri, streamName, conf, new byte[0]); DistributedLogManager dlm1 = namespace.openLog(streamName); try { dlm1.startLogSegmentNonPartitioned(); @@ -139,7 +139,7 @@ public class TestLogSegmentsZK extends TestDistributedLogBase { } // invalid max ledger sequence number - updateMaxLogSegmentSequenceNo(namespace.getSharedWriterZKCForDL(), uri, streamName, conf, "invalid-max".getBytes(UTF_8)); + updateMaxLogSegmentSequenceNo(getZooKeeperClient(namespace), uri, streamName, conf, "invalid-max".getBytes(UTF_8)); DistributedLogManager dlm2 = namespace.openLog(streamName); try { dlm2.startLogSegmentNonPartitioned(); @@ -167,10 +167,10 @@ public class TestLogSegmentsZK extends TestDistributedLogBase { .setImmediateFlushEnabled(true) .setEnableLedgerAllocatorPool(true) .setLedgerAllocatorPoolName("test"); - BKDistributedLogNamespace namespace = BKDistributedLogNamespace.newBuilder().conf(conf).uri(uri).build(); + DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder().conf(conf).uri(uri).build(); namespace.createLog(streamName); - MaxLogSegmentSequenceNo max1 = getMaxLogSegmentSequenceNo(namespace.getSharedWriterZKCForDL(), uri, streamName, conf); + MaxLogSegmentSequenceNo max1 = getMaxLogSegmentSequenceNo(getZooKeeperClient(namespace), uri, streamName, conf); assertEquals(DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO, max1.getSequenceNumber()); DistributedLogManager dlm = namespace.openLog(streamName); final int numSegments = 3; @@ -179,11 +179,11 @@ public class TestLogSegmentsZK extends TestDistributedLogBase { out.write(DLMTestUtil.getLogRecordInstance(i)); out.closeAndComplete(); } - MaxLogSegmentSequenceNo max2 = getMaxLogSegmentSequenceNo(namespace.getSharedWriterZKCForDL(), uri, streamName, conf); + MaxLogSegmentSequenceNo max2 = getMaxLogSegmentSequenceNo(getZooKeeperClient(namespace), uri, streamName, conf); assertEquals(3, max2.getSequenceNumber()); // update the max ledger sequence number - updateMaxLogSegmentSequenceNo(namespace.getSharedWriterZKCForDL(), uri, streamName, conf, + updateMaxLogSegmentSequenceNo(getZooKeeperClient(namespace), uri, streamName, conf, DLUtils.serializeLogSegmentSequenceNumber(99)); DistributedLogManager dlm1 = namespace.openLog(streamName); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReads.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReads.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReads.java index e322234..9553637 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReads.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReads.java @@ -54,6 +54,7 @@ public class TestNonBlockingReads extends TestDistributedLogBase { confLocal.setReadAheadBatchSize(1); confLocal.setReadAheadMaxRecords(1); confLocal.setReaderIdleWarnThresholdMillis(100); + confLocal.setReadLACLongPollTimeout(49); final DistributedLogManager dlm = createNewDLM(confLocal, name); ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); ScheduledFuture writerClosedFuture = null; @@ -129,6 +130,7 @@ public class TestNonBlockingReads extends TestDistributedLogBase { confLocal.loadConf(conf); confLocal.setReadAheadBatchSize(1); confLocal.setReadAheadMaxRecords(1); + confLocal.setReadLACLongPollTimeout(24); confLocal.setReaderIdleWarnThresholdMillis(50); confLocal.setReaderIdleErrorThresholdMillis(100); final DistributedLogManager dlm = createNewDLM(confLocal, name); @@ -174,6 +176,7 @@ public class TestNonBlockingReads extends TestDistributedLogBase { confLocal.loadConf(conf); confLocal.setReadAheadBatchSize(1); confLocal.setReadAheadMaxRecords(3); + confLocal.setReadLACLongPollTimeout(249); confLocal.setReaderIdleWarnThresholdMillis(500); confLocal.setReaderIdleErrorThresholdMillis(30000); final DistributedLogManager dlm = createNewDLM(confLocal, name); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAheadEntryReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAheadEntryReader.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAheadEntryReader.java index 74a5231..cf4fc4f 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAheadEntryReader.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAheadEntryReader.java @@ -25,6 +25,7 @@ import com.twitter.distributedlog.exceptions.DLIllegalStateException; import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryStore; import com.twitter.distributedlog.injector.AsyncFailureInjector; import com.twitter.distributedlog.logsegment.LogSegmentEntryStore; +import com.twitter.distributedlog.util.ConfUtils; import com.twitter.distributedlog.util.FutureUtils; import com.twitter.distributedlog.util.OrderedScheduler; import com.twitter.distributedlog.util.Utils; @@ -38,6 +39,7 @@ import org.junit.Test; import org.junit.rules.TestName; import java.util.List; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.*; @@ -54,6 +56,7 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase { private DistributedLogConfiguration baseConf; private OrderedScheduler scheduler; private BookKeeperClient bkc; + private ZooKeeperClient zkc; @Before public void setup() throws Exception { @@ -66,6 +69,12 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase { baseConf.setReadAheadMaxRecords(MAX_CACHED_ENTRIES); baseConf.setNumPrefetchEntriesPerLogSegment(NUM_PREFETCH_ENTRIES); baseConf.setMaxPrefetchEntriesPerLogSegment(NUM_PREFETCH_ENTRIES); + zkc = ZooKeeperClientBuilder.newBuilder() + .name("test-zk") + .zkServers(bkutil.getZkServers()) + .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds()) + .zkAclId(conf.getZkAclId()) + .build(); bkc = BookKeeperClientBuilder.newBuilder() .name("test-bk") .dlConfig(conf) @@ -86,6 +95,9 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase { if (null != scheduler) { scheduler.shutdown(); } + if (null != zkc) { + zkc.close(); + } super.teardown(); } @@ -99,8 +111,11 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase { true); LogSegmentEntryStore entryStore = new BKLogSegmentEntryStore( conf, + ConfUtils.getConstDynConf(conf), + zkc, bkc, scheduler, + null, NullStatsLogger.INSTANCE, AsyncFailureInjector.NULL); return new ReadAheadEntryReader( @@ -309,7 +324,7 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase { BKDistributedLogManager dlm = createNewDLM(baseConf, streamName); // generate list of log segments - generateCompletedLogSegments(dlm, 3, 2); + generateCompletedLogSegments(dlm, 3, 3); AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter()); FutureUtils.result(writer.truncate(new DLSN(2L, 1L, 0L))); @@ -321,23 +336,39 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase { readAheadEntryReader.start(segments); // ensure initialization to complete ensureOrderSchedulerEmpty(streamName); - expectAlreadyTruncatedTransactionException(readAheadEntryReader, - "should fail on positioning to a truncated log segment"); + expectNoException(readAheadEntryReader); + Entry.Reader entryReader = + readAheadEntryReader.getNextReadAheadEntry(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + assertEquals(2L, entryReader.getLSSN()); + assertEquals(1L, entryReader.getEntryId()); + Utils.close(readAheadEntryReader); // positioning on a partially truncated log segment (segment 2) before min active dlsn readAheadEntryReader = createEntryReader(streamName, new DLSN(2L, 0L, 0L), dlm, baseConf); readAheadEntryReader.start(segments); // ensure initialization to complete ensureOrderSchedulerEmpty(streamName); - expectAlreadyTruncatedTransactionException(readAheadEntryReader, - "should fail on positioning to a partially truncated log segment"); + expectNoException(readAheadEntryReader); + entryReader = + readAheadEntryReader.getNextReadAheadEntry(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + assertEquals(2L, entryReader.getLSSN()); + assertEquals(1L, entryReader.getEntryId()); + Utils.close(readAheadEntryReader); // positioning on a partially truncated log segment (segment 2) after min active dlsn - readAheadEntryReader = createEntryReader(streamName, new DLSN(2L, 1L, 0L), dlm, baseConf); + readAheadEntryReader = createEntryReader(streamName, new DLSN(2L, 2L, 0L), dlm, baseConf); readAheadEntryReader.start(segments); // ensure initialization to complete ensureOrderSchedulerEmpty(streamName); expectNoException(readAheadEntryReader); + entryReader = + readAheadEntryReader.getNextReadAheadEntry(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + assertEquals(2L, entryReader.getLSSN()); + assertEquals(2L, entryReader.getEntryId()); + Utils.close(readAheadEntryReader); + + Utils.close(writer); + dlm.close(); } @Test(timeout = 60000) @@ -363,6 +394,7 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase { // ensure initialization to complete ensureOrderSchedulerEmpty(streamName); expectNoException(readAheadEntryReader); + Utils.close(readAheadEntryReader); // positioning on a partially truncated log segment (segment 2) before min active dlsn readAheadEntryReader = createEntryReader(streamName, new DLSN(2L, 0L, 0L), dlm, confLocal); @@ -370,6 +402,7 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase { // ensure initialization to complete ensureOrderSchedulerEmpty(streamName); expectNoException(readAheadEntryReader); + Utils.close(readAheadEntryReader); // positioning on a partially truncated log segment (segment 2) after min active dlsn readAheadEntryReader = createEntryReader(streamName, new DLSN(2L, 1L, 0L), dlm, confLocal); @@ -377,6 +410,10 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase { // ensure initialization to complete ensureOrderSchedulerEmpty(streamName); expectNoException(readAheadEntryReader); + Utils.close(readAheadEntryReader); + + Utils.close(writer); + dlm.close(); } // @@ -418,6 +455,9 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase { ensureOrderSchedulerEmpty(streamName); expectIllegalStateException(readAheadEntryReader, "inconsistent log segment found"); + + Utils.close(readAheadEntryReader); + dlm.close(); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java index 99ef041..b183b84 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java @@ -331,6 +331,7 @@ public class TestRollLogSegments extends TestDistributedLogBase { @FlakyTest @Test(timeout = 60000) + @SuppressWarnings("deprecation") public void testCaughtUpReaderOnLogSegmentRolling() throws Exception { String name = "distrlog-caughtup-reader-on-logsegment-rolling"; @@ -344,6 +345,8 @@ public class TestRollLogSegments extends TestDistributedLogBase { confLocal.setWriteQuorumSize(1); confLocal.setAckQuorumSize(1); confLocal.setReadLACLongPollTimeout(99999999); + confLocal.setReaderIdleWarnThresholdMillis(2 * 99999999 + 1); + confLocal.setBKClientReadTimeout(99999999 + 1); DistributedLogManager dlm = createNewDLM(confLocal, name); BKSyncLogWriter writer = (BKSyncLogWriter) dlm.startLogSegmentNonPartitioned(); @@ -368,7 +371,7 @@ public class TestRollLogSegments extends TestDistributedLogBase { } BKLogSegmentWriter perStreamWriter = writer.segmentWriter; - BookKeeperClient bkc = readDLM.getReaderBKC(); + BookKeeperClient bkc = DLMTestUtil.getBookKeeperClient(readDLM); LedgerHandle readLh = bkc.get().openLedgerNoRecovery(getLedgerHandle(perStreamWriter).getId(), BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(UTF_8)); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/acl/TestZKAccessControl.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/acl/TestZKAccessControl.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/acl/TestZKAccessControl.java index ee2968d..ff924f8 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/acl/TestZKAccessControl.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/acl/TestZKAccessControl.java @@ -20,6 +20,7 @@ package com.twitter.distributedlog.acl; import com.twitter.distributedlog.TestZooKeeperClientBuilder; import com.twitter.distributedlog.ZooKeeperClient; import com.twitter.distributedlog.ZooKeeperClusterTestCase; +import com.twitter.distributedlog.impl.acl.ZKAccessControl; import com.twitter.distributedlog.thrift.AccessControlEntry; import com.twitter.util.Await; import org.apache.zookeeper.CreateMode; @@ -103,7 +104,7 @@ public class TestZKAccessControl extends ZooKeeperClusterTestCase { ZKAccessControl readZKAC = Await.result(ZKAccessControl.read(zkc, zkPath, null)); - assertEquals(zkPath, readZKAC.zkPath); + assertEquals(zkPath, readZKAC.getZKPath()); assertEquals(ZKAccessControl.DEFAULT_ACCESS_CONTROL_ENTRY, readZKAC.getAccessControlEntry()); assertTrue(ZKAccessControl.DEFAULT_ACCESS_CONTROL_ENTRY == readZKAC.getAccessControlEntry()); } @@ -145,7 +146,7 @@ public class TestZKAccessControl extends ZooKeeperClusterTestCase { } catch (KeeperException.BadVersionException bve) { // expected } - readZKAC2.accessControlEntry.setDenyTruncate(true); + readZKAC2.getAccessControlEntry().setDenyTruncate(true); Await.result(readZKAC2.update(zkc)); ZKAccessControl readZKAC3 = Await.result(ZKAccessControl.read(zkc, zkPath, null)); assertEquals(readZKAC2, readZKAC3); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/test/java/com/twitter/distributedlog/acl/TestZKAccessControlManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/acl/TestZKAccessControlManager.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/acl/TestZKAccessControlManager.java index 8ba82f5..5625306 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/acl/TestZKAccessControlManager.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/acl/TestZKAccessControlManager.java @@ -22,6 +22,8 @@ import com.twitter.distributedlog.TestZooKeeperClientBuilder; import com.twitter.distributedlog.ZooKeeperClient; import com.twitter.distributedlog.ZooKeeperClientUtils; import com.twitter.distributedlog.ZooKeeperClusterTestCase; +import com.twitter.distributedlog.impl.acl.ZKAccessControl; +import com.twitter.distributedlog.impl.acl.ZKAccessControlManager; import com.twitter.distributedlog.thrift.AccessControlEntry; import com.twitter.util.Await; import org.junit.After;