http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogSegmentWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogSegmentWriter.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogSegmentWriter.java index ccbfc44..4ad0bc0 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogSegmentWriter.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogSegmentWriter.java @@ -17,24 +17,23 @@ */ package org.apache.distributedlog; +import java.util.concurrent.CompletableFuture; import org.apache.distributedlog.exceptions.BKTransmitException; import org.apache.distributedlog.exceptions.EndOfStreamException; import org.apache.distributedlog.exceptions.WriteCancelledException; import org.apache.distributedlog.exceptions.WriteException; +import org.apache.distributedlog.exceptions.ZKException; import org.apache.distributedlog.impl.BKNamespaceDriver; import org.apache.distributedlog.impl.logsegment.BKLogSegmentEntryWriter; -import org.apache.distributedlog.io.Abortables; import org.apache.distributedlog.lock.SessionLockFactory; import org.apache.distributedlog.lock.ZKDistributedLock; import org.apache.distributedlog.lock.ZKSessionLockFactory; import org.apache.distributedlog.impl.metadata.BKDLConfig; import org.apache.distributedlog.util.ConfUtils; -import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.common.concurrent.FutureUtils; import org.apache.distributedlog.util.OrderedScheduler; -import org.apache.distributedlog.util.PermitLimiter; +import org.apache.distributedlog.common.util.PermitLimiter; import org.apache.distributedlog.util.Utils; -import com.twitter.util.Await; -import com.twitter.util.Future; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerHandle; @@ -42,14 +41,12 @@ import org.apache.bookkeeper.feature.SettableFeatureProvider; import org.apache.bookkeeper.stats.AlertStatsLogger; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; -import scala.runtime.AbstractFunction0; import java.io.IOException; import java.net.URI; @@ -129,9 +126,9 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { boolean acquireLock) throws Exception { try { - Await.result(Utils.zkAsyncCreateFullPathOptimistic(zkClient, path, new byte[0], + Utils.ioResult(Utils.zkAsyncCreateFullPathOptimistic(zkClient, path, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); - } catch (KeeperException.NodeExistsException nee) { + } catch (ZKException zke) { // node already exists } SessionLockFactory lockFactory = new ZKSessionLockFactory( @@ -150,7 +147,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { Long.MAX_VALUE, NullStatsLogger.INSTANCE); if (acquireLock) { - return FutureUtils.result(lock.asyncAcquire()); + return Utils.ioResult(lock.asyncAcquire()); } else { return lock; } @@ -158,9 +155,9 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { private void closeWriterAndLock(BKLogSegmentWriter writer, ZKDistributedLock lock) - throws IOException { + throws Exception { try { - FutureUtils.result(writer.asyncClose()); + Utils.ioResult(writer.asyncClose()); } finally { Utils.closeQuietly(lock); } @@ -170,7 +167,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { ZKDistributedLock lock) throws IOException { try { - Abortables.abort(writer, false); + Utils.abort(writer, false); } finally { Utils.closeQuietly(lock); } @@ -231,10 +228,10 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { createLogSegmentWriter(confLocal, 0L, -1L, lock); // Use another lock to wait for writer releasing lock ZKDistributedLock lock0 = createLock("/test/lock-" + runtime.getMethodName(), zkc0, false); - Future<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire(); + CompletableFuture<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire(); // add 10 records int numRecords = 10; - List<Future<DLSN>> futureList = new ArrayList<Future<DLSN>>(numRecords); + List<CompletableFuture<DLSN>> futureList = new ArrayList<CompletableFuture<DLSN>>(numRecords); for (int i = 0; i < numRecords; i++) { futureList.add(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(i))); } @@ -248,7 +245,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { 10, writer.getPositionWithinLogSegment()); // close the writer should flush buffered data and release lock closeWriterAndLock(writer, lock); - Await.result(lockFuture0); + Utils.ioResult(lockFuture0); lock0.checkOwnership(); assertEquals("Last tx id should still be " + (numRecords - 1), numRecords - 1, writer.getLastTxId()); @@ -256,7 +253,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { numRecords - 1, writer.getLastTxIdAcknowledged()); assertEquals("Position should still be " + numRecords, 10, writer.getPositionWithinLogSegment()); - List<DLSN> dlsns = Await.result(Future.collect(futureList)); + List<DLSN> dlsns = Utils.ioResult(FutureUtils.collect(futureList)); assertEquals("All records should be written", numRecords, dlsns.size()); for (int i = 0; i < numRecords; i++) { @@ -293,10 +290,10 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { createLogSegmentWriter(confLocal, 0L, -1L, lock); // Use another lock to wait for writer releasing lock ZKDistributedLock lock0 = createLock("/test/lock-" + runtime.getMethodName(), zkc0, false); - Future<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire(); + CompletableFuture<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire(); // add 10 records int numRecords = 10; - List<Future<DLSN>> futureList = new ArrayList<Future<DLSN>>(numRecords); + List<CompletableFuture<DLSN>> futureList = new ArrayList<CompletableFuture<DLSN>>(numRecords); for (int i = 0; i < numRecords; i++) { futureList.add(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(i))); } @@ -310,7 +307,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { 10, writer.getPositionWithinLogSegment()); // close the writer should flush buffered data and release lock abortWriterAndLock(writer, lock); - Await.result(lockFuture0); + Utils.ioResult(lockFuture0); lock0.checkOwnership(); assertEquals("Last tx id should still be " + (numRecords - 1), numRecords - 1, writer.getLastTxId()); @@ -323,7 +320,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { for (int i = 0; i < numRecords; i++) { try { - Await.result(futureList.get(i)); + Utils.ioResult(futureList.get(i)); fail("Should be aborted record " + i + " with transmit exception"); } catch (WriteCancelledException wce) { assertTrue("Record " + i + " should be aborted because of ledger fenced", @@ -369,10 +366,10 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { createLogSegmentWriter(confLocal, 0L, -1L, lock); // Use another lock to wait for writer releasing lock ZKDistributedLock lock0 = createLock("/test/lock-" + runtime.getMethodName(), zkc0, false); - Future<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire(); + CompletableFuture<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire(); // add 10 records int numRecords = 10; - List<Future<DLSN>> futureList = new ArrayList<Future<DLSN>>(numRecords); + List<CompletableFuture<DLSN>> futureList = new ArrayList<CompletableFuture<DLSN>>(numRecords); for (int i = 0; i < numRecords; i++) { futureList.add(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(i))); } @@ -393,7 +390,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { assertEquals("Inconsistent rc is thrown", rcToFailComplete, bkte.getBKResultCode()); } - Await.result(lockFuture0); + Utils.ioResult(lockFuture0); lock0.checkOwnership(); assertEquals("Last tx id should still be " + (numRecords - 1), numRecords - 1, writer.getLastTxId()); @@ -406,7 +403,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { for (int i = 0; i < numRecords; i++) { try { - Await.result(futureList.get(i)); + Utils.ioResult(futureList.get(i)); fail("Should be aborted record " + i + " with transmit exception"); } catch (WriteCancelledException wce) { assertTrue("Record " + i + " should be aborted because of ledger fenced", @@ -441,10 +438,10 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { createLogSegmentWriter(confLocal, 0L, -1L, lock); // Use another lock to wait for writer releasing lock ZKDistributedLock lock0 = createLock("/test/lock-" + runtime.getMethodName(), zkc0, false); - Future<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire(); + CompletableFuture<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire(); // add 10 records int numRecords = 10; - List<Future<DLSN>> futureList = new ArrayList<Future<DLSN>>(numRecords); + List<CompletableFuture<DLSN>> futureList = new ArrayList<CompletableFuture<DLSN>>(numRecords); for (int i = 0; i < numRecords; i++) { futureList.add(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(i))); } @@ -467,7 +464,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { BKException.Code.LedgerFencedException, bkte.getBKResultCode()); } - Await.result(lockFuture0); + Utils.ioResult(lockFuture0); lock0.checkOwnership(); assertEquals("Last tx id should still be " + (numRecords - 1), @@ -481,7 +478,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { for (int i = 0; i < numRecords; i++) { try { - Await.result(futureList.get(i)); + Utils.ioResult(futureList.get(i)); fail("Should be aborted record " + i + " with transmit exception"); } catch (BKTransmitException bkte) { assertEquals("Record " + i + " should be aborted", @@ -513,10 +510,10 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { createLogSegmentWriter(confLocal, 0L, -1L, lock); // Use another lock to wait for writer releasing lock ZKDistributedLock lock0 = createLock("/test/lock-" + runtime.getMethodName(), zkc0, false); - Future<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire(); + CompletableFuture<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire(); // add 10 records int numRecords = 10; - List<Future<DLSN>> futureList = new ArrayList<Future<DLSN>>(numRecords); + List<CompletableFuture<DLSN>> futureList = new ArrayList<CompletableFuture<DLSN>>(numRecords); for (int i = 0; i < numRecords; i++) { futureList.add(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(i))); } @@ -530,23 +527,19 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { numRecords, writer.getPositionWithinLogSegment()); final CountDownLatch deferLatch = new CountDownLatch(1); - writer.getFuturePool().apply(new AbstractFunction0<Object>() { - @Override - public Object apply() { - try { - deferLatch.await(); - } catch (InterruptedException e) { - LOG.warn("Interrupted on deferring completion : ", e); - } - return null; + writer.getFuturePool().submit(() -> { + try { + deferLatch.await(); + } catch (InterruptedException e) { + LOG.warn("Interrupted on deferring completion : ", e); } }); // transmit the buffered data - FutureUtils.result(writer.flush()); + Utils.ioResult(writer.flush()); // add another 10 records - List<Future<DLSN>> anotherFutureList = new ArrayList<Future<DLSN>>(numRecords); + List<CompletableFuture<DLSN>> anotherFutureList = new ArrayList<CompletableFuture<DLSN>>(numRecords); for (int i = numRecords; i < 2 * numRecords; i++) { anotherFutureList.add(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(i))); } @@ -562,13 +555,13 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { // abort the writer: it waits for outstanding transmits and abort buffered data abortWriterAndLock(writer, lock); - Await.result(lockFuture0); + Utils.ioResult(lockFuture0); lock0.checkOwnership(); // release defer latch so completion would go through deferLatch.countDown(); - List<DLSN> dlsns = Await.result(Future.collect(futureList)); + List<DLSN> dlsns = Utils.ioResult(FutureUtils.collect(futureList)); assertEquals("All first 10 records should be written", numRecords, dlsns.size()); for (int i = 0; i < numRecords; i++) { @@ -582,7 +575,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { } for (int i = 0; i < numRecords; i++) { try { - Await.result(anotherFutureList.get(i)); + Utils.ioResult(anotherFutureList.get(i)); fail("Should be aborted record " + (numRecords + i) + " with transmit exception"); } catch (WriteCancelledException wce) { // writes should be cancelled. @@ -622,7 +615,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { createLogSegmentWriter(confLocal, 0L, -1L, lock); // add 10 records int numRecords = 10; - List<Future<DLSN>> futureList = new ArrayList<Future<DLSN>>(numRecords); + List<CompletableFuture<DLSN>> futureList = new ArrayList<CompletableFuture<DLSN>>(numRecords); for (int i = 0; i < numRecords; i++) { futureList.add(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(i))); } @@ -639,7 +632,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { // close the writer to flush the output buffer closeWriterAndLock(writer, lock); - List<DLSN> dlsns = Await.result(Future.collect(futureList)); + List<DLSN> dlsns = Utils.ioResult(FutureUtils.collect(futureList)); assertEquals("All 11 records should be written", numRecords + 1, dlsns.size()); for (int i = 0; i < numRecords; i++) { @@ -687,10 +680,10 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { // close the writer closeWriterAndLock(writer, lock); - FutureUtils.result(writer.asyncClose()); + Utils.ioResult(writer.asyncClose()); try { - Await.result(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(1))); + Utils.ioResult(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(1))); fail("Should fail the write if the writer is closed"); } catch (WriteException we) { // expected @@ -713,10 +706,10 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { BKLogSegmentWriter writer = createLogSegmentWriter(confLocal, 0L, -1L, lock); - FutureUtils.result(writer.markEndOfStream()); + Utils.ioResult(writer.markEndOfStream()); try { - Await.result(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(1))); + Utils.ioResult(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(1))); fail("Should fail the write if the writer is marked as end of stream"); } catch (EndOfStreamException we) { // expected @@ -747,7 +740,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { LogRecord record = DLMTestUtil.getLogRecordInstance(1); record.setControl(); try { - Await.result(writer.asyncWrite(record)); + Utils.ioResult(writer.asyncWrite(record)); fail("Should fail the writer if the log segment is already fenced"); } catch (BKTransmitException bkte) { // expected @@ -755,7 +748,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { } try { - Await.result(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(2))); + Utils.ioResult(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(2))); fail("Should fail the writer if the log segment is already fenced"); } catch (WriteException we) { // expected @@ -781,7 +774,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase { createLogSegmentWriter(confLocal, 0L, -1L, lock); assertEquals(DLSN.InvalidDLSN, - Await.result(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(2)))); + Utils.ioResult(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(2)))); assertEquals(-1L, ((BKLogSegmentEntryWriter) writer.getEntryWriter()) .getLedgerHandle().getLastAddPushed());
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogWriteHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogWriteHandler.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogWriteHandler.java index 2566d34..c0f208f 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogWriteHandler.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogWriteHandler.java @@ -17,12 +17,13 @@ */ package org.apache.distributedlog; +import org.apache.distributedlog.api.AsyncLogWriter; +import org.apache.distributedlog.api.DistributedLogManager; import org.apache.distributedlog.bk.LedgerAllocator; import org.apache.distributedlog.bk.LedgerAllocatorPool; import org.apache.distributedlog.impl.BKNamespaceDriver; -import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder; +import org.apache.distributedlog.api.namespace.NamespaceBuilder; import org.apache.distributedlog.util.FailpointUtils; -import org.apache.distributedlog.util.FutureUtils; import org.apache.distributedlog.util.Utils; import org.junit.Rule; import org.junit.Test; @@ -58,7 +59,7 @@ public class TestBKLogWriteHandler extends TestDistributedLogBase { confLocal.setLedgerAllocatorPoolName("test-allocator-pool"); BKDistributedLogNamespace namespace = (BKDistributedLogNamespace) - DistributedLogNamespaceBuilder.newBuilder() + NamespaceBuilder.newBuilder() .conf(confLocal) .uri(uri) .build(); @@ -66,8 +67,8 @@ public class TestBKLogWriteHandler extends TestDistributedLogBase { FailpointUtils.setFailpoint(FailpointUtils.FailPointName.FP_StartLogSegmentOnAssignLogSegmentSequenceNumber, FailpointUtils.FailPointActions.FailPointAction_Throw); try { - AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter()); - FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(1L))); + AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter()); + Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(1L))); fail("Should fail opening the writer"); } catch (IOException ioe) { // expected @@ -82,7 +83,7 @@ public class TestBKLogWriteHandler extends TestDistributedLogBase { LedgerAllocatorPool allocatorPool = (LedgerAllocatorPool) allocator; assertEquals(0, allocatorPool.obtainMapSize()); - AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter()); + AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter()); writer.write(DLMTestUtil.getLogRecordInstance(1L)); Utils.close(writer); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKSyncLogReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKSyncLogReader.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKSyncLogReader.java index bb8503f..07f0db5 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKSyncLogReader.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKSyncLogReader.java @@ -17,6 +17,8 @@ */ package org.apache.distributedlog; +import org.apache.distributedlog.api.DistributedLogManager; +import org.apache.distributedlog.api.LogReader; import org.apache.distributedlog.exceptions.LogNotFoundException; import org.junit.Rule; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java index a766d3e..5e4ba07 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java @@ -21,6 +21,9 @@ import static org.junit.Assert.assertTrue; import com.google.common.base.Optional; import com.google.common.base.Ticker; +import java.util.concurrent.CompletableFuture; +import org.apache.distributedlog.api.LogReader; +import org.apache.distributedlog.api.namespace.Namespace; import org.apache.distributedlog.impl.BKNamespaceDriver; import org.apache.distributedlog.impl.logsegment.BKLogSegmentEntryWriter; import org.apache.distributedlog.injector.AsyncFailureInjector; @@ -29,14 +32,13 @@ import org.apache.distributedlog.io.AsyncCloseable; import org.apache.distributedlog.logsegment.LogSegmentEntryWriter; import org.apache.distributedlog.logsegment.LogSegmentMetadataCache; import org.apache.distributedlog.logsegment.LogSegmentMetadataStore; -import org.apache.distributedlog.namespace.DistributedLogNamespace; -import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder; +import org.apache.distributedlog.api.namespace.NamespaceBuilder; import org.apache.distributedlog.namespace.NamespaceDriver; import org.apache.distributedlog.util.ConfUtils; +import org.apache.distributedlog.common.concurrent.FutureUtils; import org.apache.distributedlog.util.OrderedScheduler; -import org.apache.distributedlog.util.PermitLimiter; -import org.apache.distributedlog.util.SchedulerUtils; -import com.twitter.util.Future; +import org.apache.distributedlog.common.util.PermitLimiter; +import org.apache.distributedlog.common.util.SchedulerUtils; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.feature.SettableFeatureProvider; import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim; @@ -171,7 +173,7 @@ public class TestDistributedLogBase { throws Exception { URI uri = createDLMURI("/" + name); ensureURICreated(uri); - final DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() + final Namespace namespace = NamespaceBuilder.newBuilder() .uri(uri) .conf(conf) .build(); @@ -181,14 +183,14 @@ public class TestDistributedLogBase { .build(); AsyncCloseable resourcesCloseable = new AsyncCloseable() { @Override - public Future<Void> asyncClose() { + public CompletableFuture<Void> asyncClose() { LOG.info("Shutting down the scheduler"); SchedulerUtils.shutdownScheduler(scheduler, 1, TimeUnit.SECONDS); LOG.info("Shut down the scheduler"); LOG.info("Closing the namespace"); namespace.close(); LOG.info("Closed the namespace"); - return Future.Void(); + return FutureUtils.Void(); } }; AsyncFailureInjector failureInjector = AsyncRandomFailureInjector.newBuilder() @@ -217,20 +219,20 @@ public class TestDistributedLogBase { Optional.of(resourcesCloseable)); } - protected LogSegmentMetadataStore getLogSegmentMetadataStore(DistributedLogNamespace namespace) + protected LogSegmentMetadataStore getLogSegmentMetadataStore(Namespace namespace) throws IOException { return namespace.getNamespaceDriver().getLogStreamMetadataStore(NamespaceDriver.Role.READER) .getLogSegmentMetadataStore(); } - protected ZooKeeperClient getZooKeeperClient(DistributedLogNamespace namespace) throws Exception { + protected ZooKeeperClient getZooKeeperClient(Namespace namespace) throws Exception { NamespaceDriver driver = namespace.getNamespaceDriver(); assertTrue(driver instanceof BKNamespaceDriver); return ((BKNamespaceDriver) driver).getWriterZKC(); } @SuppressWarnings("deprecation") - protected BookKeeperClient getBookKeeperClient(DistributedLogNamespace namespace) throws Exception { + protected BookKeeperClient getBookKeeperClient(Namespace namespace) throws Exception { NamespaceDriver driver = namespace.getNamespaceDriver(); assertTrue(driver instanceof BKNamespaceDriver); return ((BKNamespaceDriver) driver).getReaderBKC(); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestEntry.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestEntry.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestEntry.java index 6d8bd0c..30ef481 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestEntry.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestEntry.java @@ -17,19 +17,17 @@ */ package org.apache.distributedlog; -import com.google.common.base.Optional; import com.google.common.collect.Lists; +import java.util.concurrent.CompletableFuture; import org.apache.distributedlog.Entry.Reader; import org.apache.distributedlog.Entry.Writer; import org.apache.distributedlog.exceptions.LogRecordTooLongException; import org.apache.distributedlog.io.Buffer; import org.apache.distributedlog.io.CompressionCodec; -import com.twitter.io.Buf; -import com.twitter.util.Await; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.distributedlog.common.concurrent.FutureEventListener; +import org.apache.distributedlog.common.concurrent.FutureUtils; +import org.apache.distributedlog.util.Utils; import org.junit.Assert; import org.junit.Test; @@ -80,7 +78,7 @@ public class TestEntry { LogRecord largeRecord = new LogRecord(1L, new byte[MAX_LOGRECORD_SIZE + 1]); try { - writer.writeRecord(largeRecord, new Promise<DLSN>()); + writer.writeRecord(largeRecord, new CompletableFuture<DLSN>()); Assert.fail("Should fail on writing large record"); } catch (LogRecordTooLongException lrtle) { // expected @@ -103,12 +101,12 @@ public class TestEntry { assertEquals("zero bytes", 0, writer.getNumBytes()); assertEquals("zero records", 0, writer.getNumRecords()); - List<Future<DLSN>> writePromiseList = Lists.newArrayList(); + List<CompletableFuture<DLSN>> writePromiseList = Lists.newArrayList(); // write first 5 records for (int i = 0; i < 5; i++) { LogRecord record = new LogRecord(i, ("record-" + i).getBytes(UTF_8)); record.setPositionWithinLogSegment(i); - Promise<DLSN> writePromise = new Promise<DLSN>(); + CompletableFuture<DLSN> writePromise = new CompletableFuture<DLSN>(); writer.writeRecord(record, writePromise); writePromiseList.add(writePromise); assertEquals((i + 1) + " records", (i + 1), writer.getNumRecords()); @@ -117,7 +115,7 @@ public class TestEntry { // write large record LogRecord largeRecord = new LogRecord(1L, new byte[MAX_LOGRECORD_SIZE + 1]); try { - writer.writeRecord(largeRecord, new Promise<DLSN>()); + writer.writeRecord(largeRecord, new CompletableFuture<DLSN>()); Assert.fail("Should fail on writing large record"); } catch (LogRecordTooLongException lrtle) { // expected @@ -128,7 +126,7 @@ public class TestEntry { for (int i = 0; i < 5; i++) { LogRecord record = new LogRecord(i + 5, ("record-" + (i + 5)).getBytes(UTF_8)); record.setPositionWithinLogSegment(i + 5); - Promise<DLSN> writePromise = new Promise<DLSN>(); + CompletableFuture<DLSN> writePromise = new CompletableFuture<DLSN>(); writer.writeRecord(record, writePromise); writePromiseList.add(writePromise); assertEquals((i + 6) + " records", (i + 6), writer.getNumRecords()); @@ -138,7 +136,7 @@ public class TestEntry { // Test transmit complete writer.completeTransmit(1L, 1L); - List<DLSN> writeResults = Await.result(Future.collect(writePromiseList)); + List<DLSN> writeResults = Utils.ioResult(FutureUtils.collect(writePromiseList)); for (int i = 0; i < 10; i++) { Assert.assertEquals(new DLSN(1L, 1L, i), writeResults.get(i)); } @@ -175,23 +173,23 @@ public class TestEntry { assertEquals("zero bytes", 0, writer.getNumBytes()); assertEquals("zero records", 0, writer.getNumRecords()); - List<Future<DLSN>> writePromiseList = Lists.newArrayList(); + List<CompletableFuture<DLSN>> writePromiseList = Lists.newArrayList(); // write first 5 records for (int i = 0; i < 5; i++) { LogRecord record = new LogRecord(i, ("record-" + i).getBytes(UTF_8)); record.setPositionWithinLogSegment(i); - Promise<DLSN> writePromise = new Promise<DLSN>(); + CompletableFuture<DLSN> writePromise = new CompletableFuture<DLSN>(); writer.writeRecord(record, writePromise); writePromiseList.add(writePromise); assertEquals((i + 1) + " records", (i + 1), writer.getNumRecords()); } final LogRecordSet.Writer recordSetWriter = LogRecordSet.newWriter(1024, CompressionCodec.Type.NONE); - List<Future<DLSN>> recordSetPromiseList = Lists.newArrayList(); + List<CompletableFuture<DLSN>> recordSetPromiseList = Lists.newArrayList(); // write another 5 records as a batch for (int i = 0; i < 5; i++) { ByteBuffer record = ByteBuffer.wrap(("record-" + (i + 5)).getBytes(UTF_8)); - Promise<DLSN> writePromise = new Promise<DLSN>(); + CompletableFuture<DLSN> writePromise = new CompletableFuture<DLSN>(); recordSetWriter.writeRecord(record, writePromise); recordSetPromiseList.add(writePromise); assertEquals((i + 1) + " records", (i + 1), recordSetWriter.getNumRecords()); @@ -202,8 +200,8 @@ public class TestEntry { LogRecord setRecord = new LogRecord(5L, data); setRecord.setPositionWithinLogSegment(5); setRecord.setRecordSet(); - Promise<DLSN> writePromise = new Promise<DLSN>(); - writePromise.addEventListener(new FutureEventListener<DLSN>() { + CompletableFuture<DLSN> writePromise = new CompletableFuture<DLSN>(); + writePromise.whenComplete(new FutureEventListener<DLSN>() { @Override public void onSuccess(DLSN dlsn) { recordSetWriter.completeTransmit( @@ -224,7 +222,7 @@ public class TestEntry { for (int i = 0; i < 5; i++) { LogRecord record = new LogRecord(i + 10, ("record-" + (i + 10)).getBytes(UTF_8)); record.setPositionWithinLogSegment(i + 10); - writePromise = new Promise<DLSN>(); + writePromise = new CompletableFuture<DLSN>(); writer.writeRecord(record, writePromise); writePromiseList.add(writePromise); assertEquals((i + 11) + " records", (i + 11), writer.getNumRecords()); @@ -234,7 +232,7 @@ public class TestEntry { // Test transmit complete writer.completeTransmit(1L, 1L); - List<DLSN> writeResults = Await.result(Future.collect(writePromiseList)); + List<DLSN> writeResults = Utils.ioResult(FutureUtils.collect(writePromiseList)); for (int i = 0; i < 5; i++) { Assert.assertEquals(new DLSN(1L, 1L, i), writeResults.get(i)); } @@ -242,7 +240,7 @@ public class TestEntry { for (int i = 0; i < 5; i++) { Assert.assertEquals(new DLSN(1L, 1L, (10 + i)), writeResults.get(6 + i)); } - List<DLSN> recordSetWriteResults = Await.result(Future.collect(recordSetPromiseList)); + List<DLSN> recordSetWriteResults = Utils.ioResult(FutureUtils.collect(recordSetPromiseList)); for (int i = 0; i < 5; i++) { Assert.assertEquals(new DLSN(1L, 1L, (5 + i)), recordSetWriteResults.get(i)); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestInterleavedReaders.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestInterleavedReaders.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestInterleavedReaders.java index fd3c4ee..c111baf 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestInterleavedReaders.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestInterleavedReaders.java @@ -17,7 +17,9 @@ */ package org.apache.distributedlog; -import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.api.DistributedLogManager; +import org.apache.distributedlog.api.LogReader; +import org.apache.distributedlog.util.Utils; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,11 +83,11 @@ public class TestInterleavedReaders extends TestDistributedLogBase { BKAsyncLogWriter writer1 = dlmwrite1.startAsyncLogSegmentNonPartitioned(); for (long j = 1; j <= 4; j++) { for (int k = 1; k <= 10; k++) { - FutureUtils.result(writer1.write(DLMTestUtil.getLogRecordInstance(txid++))); - FutureUtils.result(writer0.write(DLMTestUtil.getLogRecordInstance(txid++))); + Utils.ioResult(writer1.write(DLMTestUtil.getLogRecordInstance(txid++))); + Utils.ioResult(writer0.write(DLMTestUtil.getLogRecordInstance(txid++))); } - FutureUtils.result(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1))); - FutureUtils.result(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1))); + Utils.ioResult(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1))); + Utils.ioResult(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1))); if (null == reader0) { reader0 = dlmreader0.getInputStream(1); } @@ -124,13 +126,13 @@ public class TestInterleavedReaders extends TestDistributedLogBase { writer1.setForceRolling(true); } for (int k = 1; k <= 2; k++) { - FutureUtils.result(writer1.write(DLMTestUtil.getLogRecordInstance(txid++))); - FutureUtils.result(writer0.write(DLMTestUtil.getLogRecordInstance(txid++))); + Utils.ioResult(writer1.write(DLMTestUtil.getLogRecordInstance(txid++))); + Utils.ioResult(writer0.write(DLMTestUtil.getLogRecordInstance(txid++))); writer0.setForceRolling(false); writer1.setForceRolling(false); } - FutureUtils.result(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1))); - FutureUtils.result(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1))); + Utils.ioResult(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1))); + Utils.ioResult(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1))); LOG.info("Completed {} write", j); if (null == reader0) { reader0 = dlmreader0.getInputStream(1); @@ -170,13 +172,13 @@ public class TestInterleavedReaders extends TestDistributedLogBase { writer0.setForceRolling(true); writer1.setForceRolling(true); } - FutureUtils.result(writer1.write(DLMTestUtil.getLogRecordInstance(txid++))); - FutureUtils.result(writer0.write(DLMTestUtil.getLogRecordInstance(txid++))); + Utils.ioResult(writer1.write(DLMTestUtil.getLogRecordInstance(txid++))); + Utils.ioResult(writer0.write(DLMTestUtil.getLogRecordInstance(txid++))); writer0.setForceRolling(false); writer1.setForceRolling(false); } - FutureUtils.result(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1))); - FutureUtils.result(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1))); + Utils.ioResult(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1))); + Utils.ioResult(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1))); if (null == reader0) { reader0 = dlmreader0.getInputStream(1); } @@ -212,9 +214,9 @@ public class TestInterleavedReaders extends TestDistributedLogBase { writer1.setForceRolling(true); writer1.overRideMinTimeStampToKeep(retentionPeriodOverride); } - DLSN dlsn1 = FutureUtils.result(writer1.write(DLMTestUtil.getLogRecordInstance(txid++))); + DLSN dlsn1 = Utils.ioResult(writer1.write(DLMTestUtil.getLogRecordInstance(txid++))); LOG.info("writer1 write record {}", dlsn1); - DLSN dlsn0 = FutureUtils.result(writer0.write(DLMTestUtil.getLogRecordInstance(txid++))); + DLSN dlsn0 = Utils.ioResult(writer0.write(DLMTestUtil.getLogRecordInstance(txid++))); LOG.info("writer0 write record {}", dlsn0); if (k == 5) { writer0.setForceRolling(false); @@ -223,8 +225,8 @@ public class TestInterleavedReaders extends TestDistributedLogBase { } Thread.sleep(5); } - FutureUtils.result(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1))); - FutureUtils.result(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1))); + Utils.ioResult(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1))); + Utils.ioResult(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1))); } writer0.close(); writer1.close(); @@ -264,15 +266,15 @@ public class TestInterleavedReaders extends TestDistributedLogBase { writer0.setForceRecovery(true); writer1.setForceRecovery(true); } - DLSN dlsn1 = FutureUtils.result(writer1.write(DLMTestUtil.getLogRecordInstance(txid++))); + DLSN dlsn1 = Utils.ioResult(writer1.write(DLMTestUtil.getLogRecordInstance(txid++))); LOG.info("writer1 write record {} - txid = {}", dlsn1, txid-1); - DLSN dlsn0 = FutureUtils.result(writer0.write(DLMTestUtil.getLogRecordInstance(txid++))); + DLSN dlsn0 = Utils.ioResult(writer0.write(DLMTestUtil.getLogRecordInstance(txid++))); LOG.info("writer0 write record {} - txid = {}", dlsn0, txid-1); writer0.setForceRecovery(false); writer1.setForceRecovery(false); } - FutureUtils.result(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1))); - FutureUtils.result(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1))); + Utils.ioResult(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1))); + Utils.ioResult(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1))); if (null == reader0) { reader0 = dlmreader0.getInputStream(1); } @@ -313,13 +315,13 @@ public class TestInterleavedReaders extends TestDistributedLogBase { writer1.setForceRolling(true); } for (int k = 1; k <= 2; k++) { - FutureUtils.result(writer1.write(DLMTestUtil.getLogRecordInstance(txid++))); - FutureUtils.result(writer0.write(DLMTestUtil.getLogRecordInstance(txid++))); + Utils.ioResult(writer1.write(DLMTestUtil.getLogRecordInstance(txid++))); + Utils.ioResult(writer0.write(DLMTestUtil.getLogRecordInstance(txid++))); writer0.setForceRolling(false); writer1.setForceRolling(false); } - FutureUtils.result(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1))); - FutureUtils.result(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1))); + Utils.ioResult(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1))); + Utils.ioResult(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1))); if (null == reader0) { reader0 = dlmreader0.getInputStream(1); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentCreation.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentCreation.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentCreation.java index 152e4d8..8bdf86d 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentCreation.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentCreation.java @@ -20,9 +20,10 @@ package org.apache.distributedlog; import java.net.URI; import java.util.List; -import org.apache.distributedlog.namespace.DistributedLogNamespace; -import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder; -import com.twitter.util.Await; +import org.apache.distributedlog.api.DistributedLogManager; +import org.apache.distributedlog.api.namespace.Namespace; +import org.apache.distributedlog.api.namespace.NamespaceBuilder; +import org.apache.distributedlog.util.Utils; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +44,7 @@ public class TestLogSegmentCreation extends TestDistributedLogBase { .setImmediateFlushEnabled(true) .setEnableLedgerAllocatorPool(true) .setLedgerAllocatorPoolName("test"); - DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() + Namespace namespace = NamespaceBuilder.newBuilder() .conf(conf).uri(uri).build(); DistributedLogManager dlm = namespace.openLog(name); final int numSegments = 3; @@ -68,7 +69,7 @@ public class TestLogSegmentCreation extends TestDistributedLogBase { writer2.closeAndComplete(); try { - Await.result(writer1.write(DLMTestUtil.getLogRecordInstance(numSegments + 1))); + Utils.ioResult(writer1.write(DLMTestUtil.getLogRecordInstance(numSegments + 1))); fail("Should fail on writing new log records."); } catch (Throwable t) { LOG.error("Failed to write entry : ", t); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentMetadata.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentMetadata.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentMetadata.java index 31df059..39ffe85 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentMetadata.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentMetadata.java @@ -21,8 +21,7 @@ import org.apache.distributedlog.LogSegmentMetadata.LogSegmentMetadataBuilder; import org.apache.distributedlog.LogSegmentMetadata.LogSegmentMetadataVersion; import org.apache.distributedlog.LogSegmentMetadata.TruncationStatus; import org.apache.distributedlog.exceptions.UnsupportedMetadataVersionException; - -import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.util.Utils; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -64,7 +63,7 @@ public class TestLogSegmentMetadata extends ZooKeeperClusterTestCase { LogSegmentMetadata metadata1 = new LogSegmentMetadataBuilder("/metadata1", LogSegmentMetadata.LEDGER_METADATA_CURRENT_LAYOUT_VERSION, 1000, 1).setRegionId(TEST_REGION_ID).build(); metadata1.write(zkc); - LogSegmentMetadata read1 = FutureUtils.result(LogSegmentMetadata.read(zkc, "/metadata1")); + LogSegmentMetadata read1 = Utils.ioResult(LogSegmentMetadata.read(zkc, "/metadata1")); assertEquals(metadata1, read1); assertEquals(TEST_REGION_ID, read1.getRegionId()); } @@ -75,7 +74,7 @@ public class TestLogSegmentMetadata extends ZooKeeperClusterTestCase { 1, 1000, 1).setRegionId(TEST_REGION_ID).build(); metadata1.write(zkc); // synchronous read - LogSegmentMetadata read1 = FutureUtils.result(LogSegmentMetadata.read(zkc, "/metadata2", true)); + LogSegmentMetadata read1 = Utils.ioResult(LogSegmentMetadata.read(zkc, "/metadata2", true)); assertEquals(read1.getLogSegmentId(), metadata1.getLogSegmentId()); assertEquals(read1.getFirstTxId(), metadata1.getFirstTxId()); assertEquals(read1.getLastTxId(), metadata1.getLastTxId()); @@ -90,7 +89,7 @@ public class TestLogSegmentMetadata extends ZooKeeperClusterTestCase { metadata1.write(zkc); // synchronous read try { - LogSegmentMetadata read1 = FutureUtils.result(LogSegmentMetadata.read(zkc, "/metadata-failure")); + LogSegmentMetadata read1 = Utils.ioResult(LogSegmentMetadata.read(zkc, "/metadata-failure")); fail("The previous statement should throw an exception"); } catch (UnsupportedMetadataVersionException e) { // Expected http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentsZK.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentsZK.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentsZK.java index 8c01a5c..fcc3395 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentsZK.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentsZK.java @@ -17,11 +17,12 @@ */ package org.apache.distributedlog; +import org.apache.distributedlog.api.DistributedLogManager; +import org.apache.distributedlog.api.namespace.Namespace; import org.apache.distributedlog.exceptions.DLIllegalStateException; import org.apache.distributedlog.exceptions.UnexpectedException; import org.apache.distributedlog.metadata.LogMetadata; -import org.apache.distributedlog.namespace.DistributedLogNamespace; -import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder; +import org.apache.distributedlog.api.namespace.NamespaceBuilder; import org.apache.distributedlog.util.DLUtils; import org.apache.bookkeeper.meta.ZkVersion; import org.apache.bookkeeper.versioning.Versioned; @@ -80,7 +81,7 @@ public class TestLogSegmentsZK extends TestDistributedLogBase { .setImmediateFlushEnabled(true) .setEnableLedgerAllocatorPool(true) .setLedgerAllocatorPoolName("test"); - DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder().conf(conf).uri(uri).build(); + Namespace namespace = NamespaceBuilder.newBuilder().conf(conf).uri(uri).build(); namespace.createLog(streamName); MaxLogSegmentSequenceNo max1 = getMaxLogSegmentSequenceNo(getZooKeeperClient(namespace), uri, streamName, conf); @@ -111,7 +112,7 @@ public class TestLogSegmentsZK extends TestDistributedLogBase { .setImmediateFlushEnabled(true) .setEnableLedgerAllocatorPool(true) .setLedgerAllocatorPoolName("test"); - DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder().conf(conf).uri(uri).build(); + Namespace namespace = NamespaceBuilder.newBuilder().conf(conf).uri(uri).build(); namespace.createLog(streamName); MaxLogSegmentSequenceNo max1 = getMaxLogSegmentSequenceNo(getZooKeeperClient(namespace), uri, streamName, conf); @@ -167,7 +168,7 @@ public class TestLogSegmentsZK extends TestDistributedLogBase { .setImmediateFlushEnabled(true) .setEnableLedgerAllocatorPool(true) .setLedgerAllocatorPoolName("test"); - DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder().conf(conf).uri(uri).build(); + Namespace namespace = NamespaceBuilder.newBuilder().conf(conf).uri(uri).build(); namespace.createLog(streamName); MaxLogSegmentSequenceNo max1 = getMaxLogSegmentSequenceNo(getZooKeeperClient(namespace), uri, streamName, conf); @@ -223,7 +224,7 @@ public class TestLogSegmentsZK extends TestDistributedLogBase { .setImmediateFlushEnabled(true) .setEnableLedgerAllocatorPool(true) .setLedgerAllocatorPoolName("test"); - DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder().conf(conf).uri(uri).build(); + Namespace namespace = NamespaceBuilder.newBuilder().conf(conf).uri(uri).build(); namespace.createLog(streamName); DistributedLogManager dlm1 = namespace.openLog(streamName); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestNonBlockingReads.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestNonBlockingReads.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestNonBlockingReads.java index 5bfbf45..2b02704 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestNonBlockingReads.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestNonBlockingReads.java @@ -21,9 +21,11 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.apache.distributedlog.annotations.DistributedLogAnnotations; +import org.apache.distributedlog.api.DistributedLogManager; +import org.apache.distributedlog.api.LogReader; +import org.apache.distributedlog.common.annotations.DistributedLogAnnotations; import org.apache.distributedlog.exceptions.IdleReaderException; -import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.util.Utils; import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; @@ -229,7 +231,7 @@ public class TestNonBlockingReads extends TestDistributedLogBase { BKAsyncLogWriter out = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned(); for (long j = 1; j <= segmentSize; j++) { LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); - FutureUtils.result(out.write(op)); + Utils.ioResult(out.write(op)); numRecordsWritten++; } out.closeAndComplete(); @@ -237,7 +239,7 @@ public class TestNonBlockingReads extends TestDistributedLogBase { BKLogWriteHandler blplm = ((BKDistributedLogManager) (dlm)).createWriteHandler(true); String completedZNode = blplm.completedLedgerZNode(txid - segmentSize, txid - 1, 3); - LogSegmentMetadata metadata = FutureUtils.result(LogSegmentMetadata.read(zkClient, completedZNode)); + LogSegmentMetadata metadata = Utils.ioResult(LogSegmentMetadata.read(zkClient, completedZNode)); zkClient.get().delete(completedZNode, -1); LogSegmentMetadata metadataToChange = metadata.mutator() @@ -253,7 +255,7 @@ public class TestNonBlockingReads extends TestDistributedLogBase { BKAsyncLogWriter out = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned(); for (long j = 1; j <= segmentSize; j++) { LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); - FutureUtils.result(out.write(op)); + Utils.ioResult(out.write(op)); numRecordsWritten++; } out.closeAndComplete(); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestNonBlockingReadsMultiReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestNonBlockingReadsMultiReader.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestNonBlockingReadsMultiReader.java index 8f445c4..6c9e354 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestNonBlockingReadsMultiReader.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestNonBlockingReadsMultiReader.java @@ -18,8 +18,10 @@ package org.apache.distributedlog; import com.google.common.util.concurrent.RateLimiter; +import org.apache.distributedlog.api.AsyncLogWriter; +import org.apache.distributedlog.api.DistributedLogManager; +import org.apache.distributedlog.api.LogReader; import org.apache.distributedlog.exceptions.DLInterruptedException; -import org.apache.distributedlog.util.FutureUtils; import org.apache.distributedlog.util.Utils; import org.junit.Test; @@ -93,8 +95,8 @@ public class TestNonBlockingReadsMultiReader extends TestDistributedLogBase { DistributedLogManager dlmwrite = createNewDLM(confLocal, name); final AsyncLogWriter writer = dlmwrite.startAsyncLogSegmentNonPartitioned(); - FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(0))); - FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(1))); + Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(0))); + Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(1))); final AtomicInteger writeCount = new AtomicInteger(2); DistributedLogManager dlmread = createNewDLM(conf, name); @@ -116,7 +118,7 @@ public class TestNonBlockingReadsMultiReader extends TestDistributedLogBase { while (running.get()) { limiter.acquire(); long curTxId = txid++; - dlsn = FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(curTxId))); + dlsn = Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(curTxId))); writeCount.incrementAndGet(); if (curTxId % 1000 == 0) { LOG.info("writer write {}", curTxId); @@ -126,7 +128,7 @@ public class TestNonBlockingReadsMultiReader extends TestDistributedLogBase { Utils.close(writer); } catch (DLInterruptedException die) { Thread.currentThread().interrupt(); - } catch (IOException e) { + } catch (Exception e) { } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java index ac9984b..eda8eb2 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java @@ -20,16 +20,18 @@ package org.apache.distributedlog; import com.google.common.base.Optional; import com.google.common.base.Ticker; import com.google.common.collect.Lists; +import java.util.concurrent.CompletableFuture; +import org.apache.distributedlog.api.AsyncLogWriter; +import org.apache.distributedlog.api.DistributedLogManager; import org.apache.distributedlog.exceptions.AlreadyTruncatedTransactionException; import org.apache.distributedlog.exceptions.DLIllegalStateException; import org.apache.distributedlog.impl.logsegment.BKLogSegmentEntryStore; import org.apache.distributedlog.injector.AsyncFailureInjector; import org.apache.distributedlog.logsegment.LogSegmentEntryStore; import org.apache.distributedlog.util.ConfUtils; -import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.common.concurrent.FutureUtils; import org.apache.distributedlog.util.OrderedScheduler; import org.apache.distributedlog.util.Utils; -import com.twitter.util.Promise; import org.apache.bookkeeper.stats.AlertStatsLogger; import org.apache.bookkeeper.stats.NullStatsLogger; import org.junit.After; @@ -130,14 +132,14 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase { } private void ensureOrderSchedulerEmpty(String streamName) throws Exception { - final Promise<Void> promise = new Promise<Void>(); + final CompletableFuture<Void> promise = new CompletableFuture<Void>(); scheduler.submit(streamName, new Runnable() { @Override public void run() { - FutureUtils.setValue(promise, null); + FutureUtils.complete(promise, null); } }); - FutureUtils.result(promise); + Utils.ioResult(promise); } void generateCompletedLogSegments(DistributedLogManager dlm, @@ -153,12 +155,12 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase { long txid = startTxId; for (long i = 0; i < numCompletedSegments; i++) { - AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter()); + AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter()); for (long j = 1; j <= segmentSize; j++) { - FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(txid++))); + Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txid++))); LogRecord ctrlRecord = DLMTestUtil.getLogRecordInstance(txid); ctrlRecord.setControl(); - FutureUtils.result(writer.write(ctrlRecord)); + Utils.ioResult(writer.write(ctrlRecord)); } Utils.close(writer); } @@ -167,12 +169,12 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase { AsyncLogWriter createInprogressLogSegment(DistributedLogManager dlm, DistributedLogConfiguration conf, long segmentSize) throws Exception { - AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter()); + AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter()); for (long i = 1L; i <= segmentSize; i++) { - FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(i))); + Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(i))); LogRecord ctrlRecord = DLMTestUtil.getLogRecordInstance(i); ctrlRecord.setControl(); - FutureUtils.result(writer.write(ctrlRecord)); + Utils.ioResult(writer.write(ctrlRecord)); } return writer; } @@ -325,8 +327,8 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase { // generate list of log segments generateCompletedLogSegments(dlm, 3, 3); - AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter()); - FutureUtils.result(writer.truncate(new DLSN(2L, 1L, 0L))); + AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter()); + Utils.ioResult(writer.truncate(new DLSN(2L, 1L, 0L))); List<LogSegmentMetadata> segments = dlm.getLogSegments(); @@ -382,8 +384,8 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase { // generate list of log segments generateCompletedLogSegments(dlm, 3, 2); - AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter()); - FutureUtils.result(writer.truncate(new DLSN(2L, 1L, 0L))); + AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter()); + Utils.ioResult(writer.truncate(new DLSN(2L, 1L, 0L))); List<LogSegmentMetadata> segments = dlm.getLogSegments(); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestReadUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestReadUtils.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestReadUtils.java index 029e872..efc9ac6 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestReadUtils.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestReadUtils.java @@ -18,16 +18,15 @@ package org.apache.distributedlog; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.base.Optional; import com.google.common.collect.Lists; +import org.apache.distributedlog.api.AsyncLogWriter; import org.apache.distributedlog.logsegment.LogSegmentFilter; -import org.apache.distributedlog.util.FutureUtils; import org.apache.distributedlog.util.Utils; -import com.twitter.util.Await; -import com.twitter.util.Future; import org.junit.Rule; import org.junit.Test; @@ -47,7 +46,7 @@ public class TestReadUtils extends TestDistributedLogBase { @Rule public TestName runtime = new TestName(); - private Future<Optional<LogRecordWithDLSN>> getLogRecordNotLessThanTxId( + private CompletableFuture<Optional<LogRecordWithDLSN>> getLogRecordNotLessThanTxId( BKDistributedLogManager bkdlm, int logsegmentIdx, long transactionId) throws Exception { List<LogSegmentMetadata> logSegments = bkdlm.getLogSegments(); return ReadUtils.getLogRecordNotLessThanTxId( @@ -60,7 +59,7 @@ public class TestReadUtils extends TestDistributedLogBase { ); } - private Future<LogRecordWithDLSN> getFirstGreaterThanRecord(BKDistributedLogManager bkdlm, int ledgerNo, DLSN dlsn) throws Exception { + private CompletableFuture<LogRecordWithDLSN> getFirstGreaterThanRecord(BKDistributedLogManager bkdlm, int ledgerNo, DLSN dlsn) throws Exception { List<LogSegmentMetadata> ledgerList = bkdlm.getLogSegments(); return ReadUtils.asyncReadFirstUserRecord( bkdlm.getStreamName(), ledgerList.get(ledgerNo), 2, 16, new AtomicInteger(0), Executors.newFixedThreadPool(1), @@ -68,9 +67,9 @@ public class TestReadUtils extends TestDistributedLogBase { ); } - private Future<LogRecordWithDLSN> getLastUserRecord(BKDistributedLogManager bkdlm, int ledgerNo) throws Exception { + private CompletableFuture<LogRecordWithDLSN> getLastUserRecord(BKDistributedLogManager bkdlm, int ledgerNo) throws Exception { BKLogReadHandler readHandler = bkdlm.createReadHandler(); - List<LogSegmentMetadata> ledgerList = FutureUtils.result( + List<LogSegmentMetadata> ledgerList = Utils.ioResult( readHandler.readLogSegmentsFromStore( LogSegmentMetadata.COMPARATOR, LogSegmentFilter.DEFAULT_FILTER, @@ -89,8 +88,8 @@ public class TestReadUtils extends TestDistributedLogBase { DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0, 5, 1 /* txid */); DLSN dlsn = new DLSN(1,0,0); - Future<LogRecordWithDLSN> futureLogrec = getFirstGreaterThanRecord(bkdlm, 0, dlsn); - LogRecordWithDLSN logrec = Await.result(futureLogrec); + CompletableFuture<LogRecordWithDLSN> futureLogrec = getFirstGreaterThanRecord(bkdlm, 0, dlsn); + LogRecordWithDLSN logrec = Utils.ioResult(futureLogrec); assertEquals("should be an exact match", dlsn, logrec.getDlsn()); bkdlm.close(); } @@ -102,8 +101,8 @@ public class TestReadUtils extends TestDistributedLogBase { DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0, 5, 1 /* txid */); DLSN dlsn = new DLSN(1,1,0); - Future<LogRecordWithDLSN> futureLogrec = getFirstGreaterThanRecord(bkdlm, 0, dlsn); - LogRecordWithDLSN logrec = Await.result(futureLogrec); + CompletableFuture<LogRecordWithDLSN> futureLogrec = getFirstGreaterThanRecord(bkdlm, 0, dlsn); + LogRecordWithDLSN logrec = Utils.ioResult(futureLogrec); assertEquals("should be an exact match", dlsn, logrec.getDlsn()); bkdlm.close(); } @@ -115,8 +114,8 @@ public class TestReadUtils extends TestDistributedLogBase { DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0, 5, 1 /* txid */); DLSN dlsn = new DLSN(1,0,1); - Future<LogRecordWithDLSN> futureLogrec = getFirstGreaterThanRecord(bkdlm, 0, dlsn); - LogRecordWithDLSN logrec = Await.result(futureLogrec); + CompletableFuture<LogRecordWithDLSN> futureLogrec = getFirstGreaterThanRecord(bkdlm, 0, dlsn); + LogRecordWithDLSN logrec = Utils.ioResult(futureLogrec); assertEquals(new DLSN(1,1,0), logrec.getDlsn()); bkdlm.close(); } @@ -128,8 +127,8 @@ public class TestReadUtils extends TestDistributedLogBase { DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0, 5 /* user recs */ , 1 /* txid */); DLSN dlsn = new DLSN(2,0,0); - Future<LogRecordWithDLSN> futureLogrec = getFirstGreaterThanRecord(bkdlm, 0, dlsn); - LogRecordWithDLSN logrec = Await.result(futureLogrec); + CompletableFuture<LogRecordWithDLSN> futureLogrec = getFirstGreaterThanRecord(bkdlm, 0, dlsn); + LogRecordWithDLSN logrec = Utils.ioResult(futureLogrec); assertEquals(null, logrec); bkdlm.close(); } @@ -144,8 +143,8 @@ public class TestReadUtils extends TestDistributedLogBase { txid += DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0, 5 /* user recs */ , txid); DLSN dlsn = new DLSN(1,3,0); - Future<LogRecordWithDLSN> futureLogrec = getFirstGreaterThanRecord(bkdlm, 1, dlsn); - LogRecordWithDLSN logrec = Await.result(futureLogrec); + CompletableFuture<LogRecordWithDLSN> futureLogrec = getFirstGreaterThanRecord(bkdlm, 1, dlsn); + LogRecordWithDLSN logrec = Utils.ioResult(futureLogrec); assertEquals(new DLSN(2,0,0), logrec.getDlsn()); bkdlm.close(); } @@ -157,8 +156,8 @@ public class TestReadUtils extends TestDistributedLogBase { DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 5 /* control recs */, 5, 1 /* txid */); DLSN dlsn = new DLSN(1,3,0); - Future<LogRecordWithDLSN> futureLogrec = getFirstGreaterThanRecord(bkdlm, 0, dlsn); - LogRecordWithDLSN logrec = Await.result(futureLogrec); + CompletableFuture<LogRecordWithDLSN> futureLogrec = getFirstGreaterThanRecord(bkdlm, 0, dlsn); + LogRecordWithDLSN logrec = Utils.ioResult(futureLogrec); assertEquals(new DLSN(1,5,0), logrec.getDlsn()); bkdlm.close(); } @@ -169,8 +168,8 @@ public class TestReadUtils extends TestDistributedLogBase { BKDistributedLogManager bkdlm = (BKDistributedLogManager) createNewDLM(conf, streamName); DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 5 /* control recs */, 5, 1 /* txid */); - Future<LogRecordWithDLSN> futureLogrec = getLastUserRecord(bkdlm, 0); - LogRecordWithDLSN logrec = Await.result(futureLogrec); + CompletableFuture<LogRecordWithDLSN> futureLogrec = getLastUserRecord(bkdlm, 0); + LogRecordWithDLSN logrec = Utils.ioResult(futureLogrec); assertEquals(new DLSN(1,9,0), logrec.getDlsn()); bkdlm.close(); } @@ -182,15 +181,15 @@ public class TestReadUtils extends TestDistributedLogBase { AsyncLogWriter out = bkdlm.startAsyncLogSegmentNonPartitioned(); int txid = 1; - Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false))); - Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false))); - Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false))); - Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, true))); - Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, true))); + Utils.ioResult(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false))); + Utils.ioResult(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false))); + Utils.ioResult(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false))); + Utils.ioResult(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, true))); + Utils.ioResult(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, true))); Utils.close(out); - Future<LogRecordWithDLSN> futureLogrec = getLastUserRecord(bkdlm, 0); - LogRecordWithDLSN logrec = Await.result(futureLogrec); + CompletableFuture<LogRecordWithDLSN> futureLogrec = getLastUserRecord(bkdlm, 0); + LogRecordWithDLSN logrec = Utils.ioResult(futureLogrec); assertEquals(new DLSN(1,2,0), logrec.getDlsn()); bkdlm.close(); } @@ -201,8 +200,8 @@ public class TestReadUtils extends TestDistributedLogBase { BKDistributedLogManager bkdlm = (BKDistributedLogManager) createNewDLM(conf, streamName); DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 5 /* control recs */, 0, 1 /* txid */); - Future<LogRecordWithDLSN> futureLogrec = getLastUserRecord(bkdlm, 0); - LogRecordWithDLSN logrec = Await.result(futureLogrec); + CompletableFuture<LogRecordWithDLSN> futureLogrec = getLastUserRecord(bkdlm, 0); + LogRecordWithDLSN logrec = Utils.ioResult(futureLogrec); assertEquals(null, logrec); bkdlm.close(); } @@ -259,7 +258,7 @@ public class TestReadUtils extends TestDistributedLogBase { DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0 /* control recs */, 1, 1 /* txid */); Optional<LogRecordWithDLSN> result = - FutureUtils.result(getLogRecordNotLessThanTxId(bkdlm, 0, 999L)); + Utils.ioResult(getLogRecordNotLessThanTxId(bkdlm, 0, 999L)); assertFalse(result.isPresent()); } @@ -270,7 +269,7 @@ public class TestReadUtils extends TestDistributedLogBase { DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0 /* control recs */, 1, 999L /* txid */); Optional<LogRecordWithDLSN> result = - FutureUtils.result(getLogRecordNotLessThanTxId(bkdlm, 0, 99L)); + Utils.ioResult(getLogRecordNotLessThanTxId(bkdlm, 0, 99L)); assertTrue(result.isPresent()); assertEquals(999L, result.get().getTransactionId()); assertEquals(0L, result.get().getDlsn().getEntryId()); @@ -284,7 +283,7 @@ public class TestReadUtils extends TestDistributedLogBase { DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0 /* control recs */, 5, 1L /* txid */); Optional<LogRecordWithDLSN> result = - FutureUtils.result(getLogRecordNotLessThanTxId(bkdlm, 0, 3L)); + Utils.ioResult(getLogRecordNotLessThanTxId(bkdlm, 0, 3L)); assertTrue(result.isPresent()); assertEquals(3L, result.get().getTransactionId()); } @@ -296,7 +295,7 @@ public class TestReadUtils extends TestDistributedLogBase { DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0 /* control recs */, 100, 1L /* txid */); Optional<LogRecordWithDLSN> result = - FutureUtils.result(getLogRecordNotLessThanTxId(bkdlm, 0, 9L)); + Utils.ioResult(getLogRecordNotLessThanTxId(bkdlm, 0, 9L)); assertTrue(result.isPresent()); assertEquals(9L, result.get().getTransactionId()); } @@ -308,7 +307,7 @@ public class TestReadUtils extends TestDistributedLogBase { DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0 /* control recs */, 100, 1L /* txid */, 3L); Optional<LogRecordWithDLSN> result = - FutureUtils.result(getLogRecordNotLessThanTxId(bkdlm, 0, 23L)); + Utils.ioResult(getLogRecordNotLessThanTxId(bkdlm, 0, 23L)); assertTrue(result.isPresent()); assertEquals(25L, result.get().getTransactionId()); } @@ -321,22 +320,22 @@ public class TestReadUtils extends TestDistributedLogBase { long txid = 1L; for (int i = 0; i < 10; ++i) { LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid); - Await.result(out.write(record)); + Utils.ioResult(out.write(record)); txid += 1; } long txidToSearch = txid; for (int i = 0; i < 10; ++i) { LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txidToSearch); - Await.result(out.write(record)); + Utils.ioResult(out.write(record)); } for (int i = 0; i < 10; ++i) { LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid); - Await.result(out.write(record)); + Utils.ioResult(out.write(record)); txid += 1; } Utils.close(out); Optional<LogRecordWithDLSN> result = - FutureUtils.result(getLogRecordNotLessThanTxId(bkdlm, 0, txidToSearch)); + Utils.ioResult(getLogRecordNotLessThanTxId(bkdlm, 0, txidToSearch)); assertTrue(result.isPresent()); assertEquals(10L, result.get().getDlsn().getEntryId()); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestReader.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestReader.java index ad5bf8e..8d9f846 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestReader.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestReader.java @@ -17,13 +17,13 @@ */ package org.apache.distributedlog; +import java.util.concurrent.CompletableFuture; +import org.apache.distributedlog.api.AsyncLogReader; +import org.apache.distributedlog.api.DistributedLogManager; +import org.apache.distributedlog.common.concurrent.FutureEventListener; import org.apache.distributedlog.util.Utils; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction1; -import scala.runtime.BoxedUnit; import java.io.IOException; import java.util.concurrent.CountDownLatch; @@ -140,8 +140,8 @@ public class TestReader implements FutureEventListener<LogRecordWithDLSN> { } private void readNext() { - Future<LogRecordWithDLSN> record = reader.readNext(); - record.addEventListener(this); + CompletableFuture<LogRecordWithDLSN> record = reader.readNext(); + record.whenComplete(this); } @Override @@ -184,12 +184,8 @@ public class TestReader implements FutureEventListener<LogRecordWithDLSN> { private void closeReader() { if (null != reader) { - reader.asyncClose().onFailure(new AbstractFunction1<Throwable, BoxedUnit>() { - @Override - public BoxedUnit apply(Throwable cause) { - LOG.warn("Exception on closing reader {} : ", readerName, cause); - return BoxedUnit.UNIT; - } + reader.asyncClose().whenComplete((value, cause) -> { + LOG.warn("Exception on closing reader {} : ", readerName, cause); }); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestRollLogSegments.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestRollLogSegments.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestRollLogSegments.java index 9032866..0111e4d 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestRollLogSegments.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestRollLogSegments.java @@ -20,14 +20,16 @@ package org.apache.distributedlog; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import org.apache.distributedlog.api.DistributedLogManager; +import org.apache.distributedlog.api.LogReader; import org.apache.distributedlog.feature.CoreFeatureKeys; import org.apache.distributedlog.impl.logsegment.BKLogSegmentEntryReader; import org.apache.distributedlog.util.FailpointUtils; -import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.common.concurrent.FutureEventListener; import org.apache.distributedlog.util.Utils; -import com.twitter.util.Future; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.feature.SettableFeature; @@ -35,9 +37,7 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.distributedlog.annotations.DistributedLogAnnotations.FlakyTest; -import com.twitter.util.Await; -import com.twitter.util.FutureEventListener; +import org.apache.distributedlog.common.annotations.DistributedLogAnnotations.FlakyTest; import static com.google.common.base.Charsets.UTF_8; import static org.junit.Assert.*; @@ -79,7 +79,7 @@ public class TestRollLogSegments extends TestDistributedLogBase { // send requests in parallel for (int i = 1; i <= numEntries; i++) { final int entryId = i; - writer.write(DLMTestUtil.getLogRecordInstance(entryId)).addEventListener(new FutureEventListener<DLSN>() { + writer.write(DLMTestUtil.getLogRecordInstance(entryId)).whenComplete(new FutureEventListener<DLSN>() { @Override public void onSuccess(DLSN value) { @@ -125,7 +125,9 @@ public class TestRollLogSegments extends TestDistributedLogBase { // send requests in parallel to have outstanding requests for (int i = 1; i <= numEntries; i++) { final int entryId = i; - Future<DLSN> writeFuture = writer.write(DLMTestUtil.getLogRecordInstance(entryId)).addEventListener(new FutureEventListener<DLSN>() { + CompletableFuture<DLSN> writeFuture = + writer.write(DLMTestUtil.getLogRecordInstance(entryId)) + .whenComplete(new FutureEventListener<DLSN>() { @Override public void onSuccess(DLSN value) { @@ -146,7 +148,7 @@ public class TestRollLogSegments extends TestDistributedLogBase { }); if (i == 1) { // wait for first log segment created - FutureUtils.result(writeFuture); + Utils.ioResult(writeFuture); } } latch.await(); @@ -191,7 +193,7 @@ public class TestRollLogSegments extends TestDistributedLogBase { long txId = 1L; // Create Log Segments - Await.result(writer.write(DLMTestUtil.getLogRecordInstance(txId))); + Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txId))); FailpointUtils.setFailpoint(FailpointUtils.FailPointName.FP_StartLogSegmentBeforeLedgerCreate, FailpointUtils.FailPointActions.FailPointAction_Throw); @@ -201,7 +203,7 @@ public class TestRollLogSegments extends TestDistributedLogBase { final int numRecords = 10; final CountDownLatch latch = new CountDownLatch(numRecords); for (int i = 0; i < numRecords; i++) { - writer.write(DLMTestUtil.getLogRecordInstance(++txId)).addEventListener(new FutureEventListener<DLSN>() { + writer.write(DLMTestUtil.getLogRecordInstance(++txId)).whenComplete(new FutureEventListener<DLSN>() { @Override public void onSuccess(DLSN value) { logger.info("Completed entry : {}.", value); @@ -266,7 +268,7 @@ public class TestRollLogSegments extends TestDistributedLogBase { // send requests in parallel to have outstanding requests for (int i = 1; i <= numLogSegments; i++) { final int entryId = i; - Future<DLSN> writeFuture = writer.write(DLMTestUtil.getLogRecordInstance(entryId)).addEventListener(new FutureEventListener<DLSN>() { + CompletableFuture<DLSN> writeFuture = writer.write(DLMTestUtil.getLogRecordInstance(entryId)).whenComplete(new FutureEventListener<DLSN>() { @Override public void onSuccess(DLSN value) { logger.info("Completed entry {} : {}.", entryId, value); @@ -279,7 +281,7 @@ public class TestRollLogSegments extends TestDistributedLogBase { }); if (i == 1) { // wait for first log segment created - FutureUtils.result(writeFuture); + Utils.ioResult(writeFuture); } } latch.await(); @@ -297,7 +299,7 @@ public class TestRollLogSegments extends TestDistributedLogBase { // writer should work after rolling log segments // there would be (numLogSegments/2) segments based on current rolling policy for (int i = 1; i <= numLogSegments; i++) { - DLSN newDLSN = Await.result(writer.write(DLMTestUtil.getLogRecordInstance(numLogSegments + i))); + DLSN newDLSN = Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(numLogSegments + i))); logger.info("Completed entry {} : {}", numLogSegments + i, newDLSN); } @@ -364,7 +366,7 @@ public class TestRollLogSegments extends TestDistributedLogBase { // 2) reader should be able to read 5 entries. for (long i = 1; i <= numEntries; i++) { - LogRecordWithDLSN record = Await.result(reader.readNext()); + LogRecordWithDLSN record = Utils.ioResult(reader.readNext()); DLMTestUtil.verifyLogRecord(record); assertEquals(i, record.getTransactionId()); assertEquals(record.getTransactionId() - 1, record.getSequenceId()); @@ -418,7 +420,7 @@ public class TestRollLogSegments extends TestDistributedLogBase { anotherWriter.closeAndComplete(); for (long i = numEntries + 1; i <= numEntries + 3; i++) { - LogRecordWithDLSN record = Await.result(reader.readNext()); + LogRecordWithDLSN record = Utils.ioResult(reader.readNext()); DLMTestUtil.verifyLogRecord(record); assertEquals(i, record.getTransactionId()); }