http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java b/distributedlog-core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java index 4d4a008..5189104 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java @@ -17,6 +17,7 @@ */ package org.apache.distributedlog.lock; +import java.util.concurrent.CompletableFuture; import org.apache.distributedlog.DLMTestUtil; import org.apache.distributedlog.exceptions.LockingException; import org.apache.distributedlog.ZooKeeperClient; @@ -27,11 +28,10 @@ import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException; import org.apache.distributedlog.lock.ZKSessionLock.State; import org.apache.distributedlog.util.FailpointUtils; import org.apache.distributedlog.util.OrderedScheduler; -import com.twitter.util.Await; -import com.twitter.util.Promise; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.util.SafeRunnable; import org.apache.commons.lang3.tuple.Pair; +import org.apache.distributedlog.util.Utils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; @@ -43,7 +43,6 @@ import org.junit.Test; import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.runtime.BoxedUnit; import java.io.IOException; import java.util.Collections; @@ -149,9 +148,9 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase { String node2 = getLockIdFromPath(createLockNodeV2(zk, lockPath, clientId)); String node3 = getLockIdFromPath(createLockNodeV3(zk, lockPath, clientId)); - assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node1))); - assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node2))); - assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node3))); + assertEquals(lockId, Utils.ioResult(asyncParseClientID(zk, lockPath, node1))); + assertEquals(lockId, Utils.ioResult(asyncParseClientID(zk, lockPath, node2))); + assertEquals(lockId, Utils.ioResult(asyncParseClientID(zk, lockPath, node3))); // Bad Lock Node Name String node4 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member")); @@ -160,15 +159,15 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase { String node7 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_badnode_badnode_badnode")); String node8 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_badnode_badnode_badnode_badnode")); - assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node4))); - assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node5))); - assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node6))); - assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node7))); - assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node8))); + assertEquals(lockId, Utils.ioResult(asyncParseClientID(zk, lockPath, node4))); + assertEquals(lockId, Utils.ioResult(asyncParseClientID(zk, lockPath, node5))); + assertEquals(lockId, Utils.ioResult(asyncParseClientID(zk, lockPath, node6))); + assertEquals(lockId, Utils.ioResult(asyncParseClientID(zk, lockPath, node7))); + assertEquals(lockId, Utils.ioResult(asyncParseClientID(zk, lockPath, node8))); // Malformed Node Name String node9 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_malformed_s12345678_999999")); - assertEquals(Pair.of("malformed", 12345678L), Await.result(asyncParseClientID(zk, lockPath, node9))); + assertEquals(Pair.of("malformed", 12345678L), Utils.ioResult(asyncParseClientID(zk, lockPath, node9))); } @Test(timeout = 60000) @@ -256,7 +255,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase { assertEquals("counter should not be increased in different epochs", 1, counter.get()); // lock action would not be executed in same epoch and promise would be satisfied with exception - Promise<BoxedUnit> promise = new Promise<BoxedUnit>(); + CompletableFuture<Void> promise = new CompletableFuture<Void>(); lock.executeLockAction(lock.getEpoch().get() + 1, new LockAction() { @Override public void execute() { @@ -269,7 +268,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase { } }, promise); try { - Await.result(promise); + Utils.ioResult(promise); fail("Should satisfy promise with epoch changed exception."); } catch (EpochChangedException ece) { // expected @@ -457,7 +456,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase { assertEquals(State.CLAIMED, lock.getLockState()); List<String> children = getLockWaiters(zkc, lockPath); assertEquals(1, children.size()); - assertEquals(lock.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0)))); + assertEquals(lock.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(0)))); // lock should fail on a success lock try { @@ -469,7 +468,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase { assertEquals(State.CLAIMED, lock.getLockState()); children = getLockWaiters(zkc, lockPath); assertEquals(1, children.size()); - assertEquals(lock.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0)))); + assertEquals(lock.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(0)))); // unlock lock.unlock(); @@ -546,7 +545,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase { assertEquals(State.CLAIMED, lock0.getLockState()); List<String> children = getLockWaiters(zkc0, lockPath); assertEquals(1, children.size()); - assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); + assertEquals(lock0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); try { lock1.tryLock(timeout, TimeUnit.MILLISECONDS); @@ -559,7 +558,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase { assertEquals(State.CLOSED, lock1.getLockState()); children = getLockWaiters(zkc0, lockPath); assertEquals(1, children.size()); - assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); + assertEquals(lock0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); lock0.unlock(); // verification after unlock lock0 @@ -574,7 +573,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase { assertEquals(State.CLAIMED, lock2.getLockState()); children = getLockWaiters(zkc, lockPath); assertEquals(1, children.size()); - assertEquals(lock2.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0)))); + assertEquals(lock2.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(0)))); lock2.unlock(); } @@ -649,7 +648,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase { assertEquals(State.CLAIMED, lock0.getLockState()); List<String> children = getLockWaiters(zkc0, lockPath); assertEquals(1, children.size()); - assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); + assertEquals(lock0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); final CountDownLatch lock1DoneLatch = new CountDownLatch(1); @@ -687,7 +686,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase { assertEquals(State.CLAIMED, lock1.getLockState()); children = getLockWaiters(zkc, lockPath); assertEquals(1, children.size()); - assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0)))); + assertEquals(lock1.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(0)))); lock1.unlock(); } @@ -719,7 +718,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase { assertEquals(State.CLAIMED, lock.getLockState()); List<String> children = getLockWaiters(zkc, lockPath); assertEquals(1, children.size()); - assertEquals(lock.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0)))); + assertEquals(lock.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(0)))); ZooKeeperClientUtils.expireSession(zkc, zkServers, sessionTimeoutMs); expiredLatch.await(); @@ -806,7 +805,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase { assertEquals(State.CLAIMED, lock0.getLockState()); List<String> children = getLockWaiters(zkc0, lockPath); assertEquals(1, children.size()); - assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); + assertEquals(lock0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); final ZKSessionLock lock1 = new ZKSessionLock(zkc, lockPath, clientId1, lockStateExecutor); final CountDownLatch lock1DoneLatch = new CountDownLatch(1); @@ -830,9 +829,9 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase { assertEquals(2, children.size()); assertEquals(State.CLAIMED, lock0.getLockState()); - assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); + assertEquals(lock0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); awaitState(State.WAITING, lock1); - assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(1)))); + assertEquals(lock1.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(1)))); // expire lock1 ZooKeeperClientUtils.expireSession(zkc, zkServers, sessionTimeoutMs); @@ -843,7 +842,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase { assertEquals(State.CLOSED, lock1.getLockState()); children = getLockWaiters(zkc0, lockPath); assertEquals(1, children.size()); - assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); + assertEquals(lock0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); } public void awaitState(State state, ZKSessionLock lock) throws InterruptedException { @@ -891,7 +890,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase { assertEquals(State.CLOSED, lock1_0.getLockState()); List<String> children = getLockWaiters(zkc0, lockPath); assertEquals(1, children.size()); - assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); + assertEquals(lock0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); // lock1_1 would wait the ownership final ZKSessionLock lock1_1 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor); @@ -917,9 +916,9 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase { assertEquals(2, children.size()); assertEquals(State.CLAIMED, lock0.getLockState()); - assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); + assertEquals(lock0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); awaitState(State.WAITING, lock1_1); - assertEquals(lock1_1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(1)))); + assertEquals(lock1_1.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(1)))); if (isUnlock) { lock0.unlock(); @@ -938,7 +937,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase { assertEquals(State.CLAIMED, lock1_1.getLockState()); children = getLockWaiters(zkc, lockPath); assertEquals(1, children.size()); - assertEquals(lock1_1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0)))); + assertEquals(lock1_1.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(0)))); lock1_1.unlock(); } @@ -1040,9 +1039,9 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase { assertEquals(2, children.size()); assertEquals(State.CLAIMED, lock0_0.getLockState()); - assertEquals(lock0_0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); + assertEquals(lock0_0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); awaitState(State.WAITING, lock1); - assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(1)))); + assertEquals(lock1.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(1)))); final CountDownLatch lock0DoneLatch = new CountDownLatch(1); final AtomicReference<String> ownerFromLock0 = new AtomicReference<String>(null); @@ -1058,9 +1057,9 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase { children = getLockWaiters(zkc, lockPath); assertEquals(2, children.size()); assertEquals(State.CLAIMED, lock0_0.getLockState()); - assertEquals(lock0_0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); + assertEquals(lock0_0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); assertEquals(State.WAITING, lock1.getLockState()); - assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(1)))); + assertEquals(lock1.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(1)))); } else { lock0Thread = new Thread(new Runnable() { @Override @@ -1087,11 +1086,11 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase { assertEquals(3, children.size()); assertEquals(State.CLAIMED, lock0_0.getLockState()); - assertEquals(lock0_0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); + assertEquals(lock0_0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); awaitState(State.WAITING, lock1); - assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(1)))); + assertEquals(lock1.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(1)))); awaitState(State.WAITING, lock0_1); - assertEquals(lock0_1.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(2)))); + assertEquals(lock0_1.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(2)))); } if (isUnlock) { @@ -1114,7 +1113,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase { children = getLockWaiters(zkc, lockPath); assertEquals(1, children.size()); assertEquals(State.CLAIMED, lock1.getLockState()); - assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0)))); + assertEquals(lock1.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(0)))); } else { assertNotNull(lock0Thread); if (!isUnlock) { @@ -1128,14 +1127,14 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase { children = getLockWaiters(zkc, lockPath); assertEquals(1, children.size()); assertEquals(State.CLAIMED, lock1.getLockState()); - assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0)))); + assertEquals(lock1.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(0)))); } else { children = getLockWaiters(zkc, lockPath); assertEquals(2, children.size()); assertEquals(State.CLAIMED, lock1.getLockState()); - assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0)))); + assertEquals(lock1.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(0)))); assertEquals(State.WAITING, lock0_1.getLockState()); - assertEquals(lock0_1.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(1)))); + assertEquals(lock0_1.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(1)))); } } @@ -1148,7 +1147,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase { children = getLockWaiters(zkc, lockPath); assertEquals(1, children.size()); assertEquals(State.CLAIMED, lock0_1.getLockState()); - assertEquals(lock0_1.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); + assertEquals(lock0_1.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); } } @@ -1186,15 +1185,15 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase { List<String> children = getLockWaiters(zkc0, lockPath); assertEquals(1, children.size()); assertEquals(State.CLAIMED, lock0.getLockState()); - assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); + assertEquals(lock0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); lock1.tryLock(timeout, TimeUnit.MILLISECONDS); children = getLockWaiters(zkc0, lockPath); assertEquals(2, children.size()); assertEquals(State.CLAIMED, lock0.getLockState()); - assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); + assertEquals(lock0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); assertEquals(State.CLAIMED, lock1.getLockState()); - assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(1)))); + assertEquals(lock1.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(1)))); if (isUnlock) { lock0.unlock(); @@ -1202,7 +1201,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase { children = getLockWaiters(zkc0, lockPath); assertEquals(1, children.size()); assertEquals(State.CLAIMED, lock1.getLockState()); - assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); + assertEquals(lock1.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0)))); lock1.unlock(); } else { ZooKeeperClientUtils.expireSession(zkc0, zkServers, sessionTimeoutMs);
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/logsegment/TestRollingPolicy.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/logsegment/TestRollingPolicy.java b/distributedlog-core/src/test/java/org/apache/distributedlog/logsegment/TestRollingPolicy.java index 5943b64..6687b7b 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/logsegment/TestRollingPolicy.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/logsegment/TestRollingPolicy.java @@ -17,7 +17,7 @@ */ package org.apache.distributedlog.logsegment; -import org.apache.distributedlog.util.Sizable; +import org.apache.distributedlog.common.util.Sizable; import org.junit.Test; import static org.junit.Assert.*; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/metadata/TestLogSegmentMetadataStoreUpdater.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/metadata/TestLogSegmentMetadataStoreUpdater.java b/distributedlog-core/src/test/java/org/apache/distributedlog/metadata/TestLogSegmentMetadataStoreUpdater.java index e18fb3f..2090828 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/metadata/TestLogSegmentMetadataStoreUpdater.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/metadata/TestLogSegmentMetadataStoreUpdater.java @@ -24,12 +24,11 @@ import org.apache.distributedlog.LogRecordWithDLSN; import org.apache.distributedlog.LogSegmentMetadata; import org.apache.distributedlog.TestZooKeeperClientBuilder; import org.apache.distributedlog.ZooKeeperClient; -import org.apache.distributedlog.ZooKeeperClientBuilder; import org.apache.distributedlog.ZooKeeperClusterTestCase; import org.apache.distributedlog.impl.ZKLogSegmentMetadataStore; import org.apache.distributedlog.logsegment.LogSegmentMetadataStore; -import org.apache.distributedlog.util.FutureUtils; import org.apache.distributedlog.util.OrderedScheduler; +import org.apache.distributedlog.util.Utils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; import org.junit.After; @@ -105,7 +104,7 @@ public class TestLogSegmentMetadataStoreUpdater extends ZooKeeperClusterTestCase // Dryrun MetadataUpdater dryrunUpdater = new DryrunLogSegmentMetadataStoreUpdater(conf, metadataStore); - FutureUtils.result(dryrunUpdater.changeSequenceNumber(segment, 6L)); + Utils.ioResult(dryrunUpdater.changeSequenceNumber(segment, 6L)); segmentList = readLogSegments(ledgerPath); assertEquals(5, segmentList.size()); @@ -113,7 +112,7 @@ public class TestLogSegmentMetadataStoreUpdater extends ZooKeeperClusterTestCase // Fix the inprogress log segments MetadataUpdater updater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(conf, metadataStore); - FutureUtils.result(updater.changeSequenceNumber(segment, 6L)); + Utils.ioResult(updater.changeSequenceNumber(segment, 6L)); segmentList = readLogSegments(ledgerPath); assertEquals(6, segmentList.size()); @@ -156,19 +155,19 @@ public class TestLogSegmentMetadataStoreUpdater extends ZooKeeperClusterTestCase // Dryrun MetadataUpdater dryrunUpdater = new DryrunLogSegmentMetadataStoreUpdater(conf, metadataStore); try { - FutureUtils.result(dryrunUpdater.updateLastRecord(completedLogSegment, badRecord)); + Utils.ioResult(dryrunUpdater.updateLastRecord(completedLogSegment, badRecord)); fail("Should fail on updating dlsn that in different log segment"); } catch (IllegalArgumentException iae) { // expected } try { - FutureUtils.result(dryrunUpdater.updateLastRecord(inprogressLogSegment, goodRecord2)); + Utils.ioResult(dryrunUpdater.updateLastRecord(inprogressLogSegment, goodRecord2)); fail("Should fail on updating dlsn for an inprogress log segment"); } catch (IllegalStateException ise) { // expected } LogSegmentMetadata updatedCompletedLogSegment = - FutureUtils.result(dryrunUpdater.updateLastRecord(completedLogSegment, goodRecord1)); + Utils.ioResult(dryrunUpdater.updateLastRecord(completedLogSegment, goodRecord1)); assertEquals(goodLastDLSN1, updatedCompletedLogSegment.getLastDLSN()); assertEquals(goodRecord1.getTransactionId(), updatedCompletedLogSegment.getLastTxId()); assertTrue(updatedCompletedLogSegment.isRecordLastPositioninThisSegment(goodRecord1)); @@ -187,18 +186,18 @@ public class TestLogSegmentMetadataStoreUpdater extends ZooKeeperClusterTestCase // Fix the last dlsn MetadataUpdater updater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(conf, metadataStore); try { - FutureUtils.result(updater.updateLastRecord(completedLogSegment, badRecord)); + Utils.ioResult(updater.updateLastRecord(completedLogSegment, badRecord)); fail("Should fail on updating dlsn that in different log segment"); } catch (IllegalArgumentException iae) { // expected } try { - FutureUtils.result(updater.updateLastRecord(inprogressLogSegment, goodRecord2)); + Utils.ioResult(updater.updateLastRecord(inprogressLogSegment, goodRecord2)); fail("Should fail on updating dlsn for an inprogress log segment"); } catch (IllegalStateException ise) { // expected } - updatedCompletedLogSegment = FutureUtils.result(updater.updateLastRecord(completedLogSegment, goodRecord1)); + updatedCompletedLogSegment = Utils.ioResult(updater.updateLastRecord(completedLogSegment, goodRecord1)); assertEquals(goodLastDLSN1, updatedCompletedLogSegment.getLastDLSN()); assertEquals(goodRecord1.getTransactionId(), updatedCompletedLogSegment.getLastTxId()); assertTrue(updatedCompletedLogSegment.isRecordLastPositioninThisSegment(goodRecord1)); @@ -245,28 +244,28 @@ public class TestLogSegmentMetadataStoreUpdater extends ZooKeeperClusterTestCase // Dryrun MetadataUpdater dryrunUpdater = new DryrunLogSegmentMetadataStoreUpdater(conf, metadataStore); - FutureUtils.result(dryrunUpdater.setLogSegmentTruncated(segmentList.get(segmentToModify))); + Utils.ioResult(dryrunUpdater.setLogSegmentTruncated(segmentList.get(segmentToModify))); segmentList = readLogSegments(ledgerPath); assertEquals(false, segmentList.get(segmentToModify).isTruncated()); // change truncation for the 1st log segment MetadataUpdater updater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(conf, metadataStore); - FutureUtils.result(updater.setLogSegmentTruncated(segmentList.get(segmentToModify))); + Utils.ioResult(updater.setLogSegmentTruncated(segmentList.get(segmentToModify))); segmentList = readLogSegments(ledgerPath); assertEquals(true, segmentList.get(segmentToModify).isTruncated()); assertEquals(false, segmentList.get(segmentToModify).isPartiallyTruncated()); updater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(conf, metadataStore); - FutureUtils.result(updater.setLogSegmentActive(segmentList.get(segmentToModify))); + Utils.ioResult(updater.setLogSegmentActive(segmentList.get(segmentToModify))); segmentList = readLogSegments(ledgerPath); assertEquals(false, segmentList.get(segmentToModify).isTruncated()); assertEquals(false, segmentList.get(segmentToModify).isPartiallyTruncated()); updater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(conf, metadataStore); - FutureUtils.result(updater.setLogSegmentPartiallyTruncated(segmentList.get(segmentToModify), + Utils.ioResult(updater.setLogSegmentPartiallyTruncated(segmentList.get(segmentToModify), segmentList.get(segmentToModify).getFirstDLSN())); segmentList = readLogSegments(ledgerPath); @@ -274,7 +273,7 @@ public class TestLogSegmentMetadataStoreUpdater extends ZooKeeperClusterTestCase assertEquals(true, segmentList.get(segmentToModify).isPartiallyTruncated()); updater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(conf, metadataStore); - FutureUtils.result(updater.setLogSegmentActive(segmentList.get(segmentToModify))); + Utils.ioResult(updater.setLogSegmentActive(segmentList.get(segmentToModify))); segmentList = readLogSegments(ledgerPath); assertEquals(false, segmentList.get(segmentToModify).isTruncated()); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/namespace/TestDistributedLogNamespaceBuilder.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/namespace/TestDistributedLogNamespaceBuilder.java b/distributedlog-core/src/test/java/org/apache/distributedlog/namespace/TestDistributedLogNamespaceBuilder.java deleted file mode 100644 index 46a3a6f..0000000 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/namespace/TestDistributedLogNamespaceBuilder.java +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.namespace; - -import org.apache.distributedlog.BKDistributedLogNamespace; -import org.apache.distributedlog.DistributedLogConfiguration; -import org.apache.distributedlog.TestDistributedLogBase; -import org.junit.Test; - -import java.net.URI; - -import static org.apache.distributedlog.LocalDLMEmulator.DLOG_NAMESPACE; -import static org.junit.Assert.assertTrue; - -/** - * Test Namespace Builder - */ -public class TestDistributedLogNamespaceBuilder extends TestDistributedLogBase { - - @Test(timeout = 60000, expected = NullPointerException.class) - public void testEmptyBuilder() throws Exception { - DistributedLogNamespaceBuilder.newBuilder().build(); - } - - @Test(timeout = 60000, expected = NullPointerException.class) - public void testMissingUri() throws Exception { - DistributedLogNamespaceBuilder.newBuilder() - .conf(new DistributedLogConfiguration()) - .build(); - } - - @Test(timeout = 60000, expected = NullPointerException.class) - public void testMissingSchemeInUri() throws Exception { - DistributedLogNamespaceBuilder.newBuilder() - .conf(new DistributedLogConfiguration()) - .uri(new URI("/test")) - .build(); - } - - @Test(timeout = 60000, expected = IllegalArgumentException.class) - public void testInvalidSchemeInUri() throws Exception { - DistributedLogNamespaceBuilder.newBuilder() - .conf(new DistributedLogConfiguration()) - .uri(new URI("dist://invalid/scheme/in/uri")) - .build(); - } - - @Test(timeout = 60000, expected = IllegalArgumentException.class) - public void testInvalidSchemeCorrectBackendInUri() throws Exception { - DistributedLogNamespaceBuilder.newBuilder() - .conf(new DistributedLogConfiguration()) - .uri(new URI("dist-bk://invalid/scheme/in/uri")) - .build(); - } - - @Test(timeout = 60000, expected = IllegalArgumentException.class) - public void testUnknownBackendInUri() throws Exception { - DistributedLogNamespaceBuilder.newBuilder() - .conf(new DistributedLogConfiguration()) - .uri(new URI("distributedlog-unknown://invalid/scheme/in/uri")) - .build(); - } - - @Test(timeout = 60000, expected = NullPointerException.class) - public void testNullStatsLogger() throws Exception { - DistributedLogNamespaceBuilder.newBuilder() - .conf(new DistributedLogConfiguration()) - .uri(new URI("distributedlog-bk://localhost/distributedlog")) - .statsLogger(null) - .build(); - } - - @Test(timeout = 60000, expected = NullPointerException.class) - public void testNullClientId() throws Exception { - DistributedLogNamespaceBuilder.newBuilder() - .conf(new DistributedLogConfiguration()) - .uri(new URI("distributedlog-bk://localhost/distributedlog")) - .clientId(null) - .build(); - } - - @Test(timeout = 60000) - public void testBuildBKDistributedLogNamespace() throws Exception { - DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() - .conf(new DistributedLogConfiguration()) - .uri(new URI("distributedlog-bk://" + zkServers + DLOG_NAMESPACE + "/bknamespace")) - .build(); - try { - assertTrue("distributedlog-bk:// should build bookkeeper based distributedlog namespace", - namespace instanceof BKDistributedLogNamespace); - } finally { - namespace.close(); - } - } - - @Test(timeout = 60000) - public void testBuildWhenMissingBackendInUri() throws Exception { - DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() - .conf(new DistributedLogConfiguration()) - .uri(new URI("distributedlog://" + zkServers + DLOG_NAMESPACE + "/defaultnamespace")) - .build(); - try { - assertTrue("distributedlog:// should build bookkeeper based distributedlog namespace", - namespace instanceof BKDistributedLogNamespace); - } finally { - namespace.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/namespace/TestNamespaceBuilder.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/namespace/TestNamespaceBuilder.java b/distributedlog-core/src/test/java/org/apache/distributedlog/namespace/TestNamespaceBuilder.java new file mode 100644 index 0000000..89b4852 --- /dev/null +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/namespace/TestNamespaceBuilder.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.namespace; + +import org.apache.distributedlog.BKDistributedLogNamespace; +import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.TestDistributedLogBase; +import org.apache.distributedlog.api.namespace.Namespace; +import org.apache.distributedlog.api.namespace.NamespaceBuilder; +import org.junit.Test; + +import java.net.URI; + +import static org.apache.distributedlog.LocalDLMEmulator.DLOG_NAMESPACE; +import static org.junit.Assert.assertTrue; + +/** + * Test Namespace Builder + */ +public class TestNamespaceBuilder extends TestDistributedLogBase { + + @Test(timeout = 60000, expected = NullPointerException.class) + public void testEmptyBuilder() throws Exception { + NamespaceBuilder.newBuilder().build(); + } + + @Test(timeout = 60000, expected = NullPointerException.class) + public void testMissingUri() throws Exception { + NamespaceBuilder.newBuilder() + .conf(new DistributedLogConfiguration()) + .build(); + } + + @Test(timeout = 60000, expected = NullPointerException.class) + public void testMissingSchemeInUri() throws Exception { + NamespaceBuilder.newBuilder() + .conf(new DistributedLogConfiguration()) + .uri(new URI("/test")) + .build(); + } + + @Test(timeout = 60000, expected = IllegalArgumentException.class) + public void testInvalidSchemeInUri() throws Exception { + NamespaceBuilder.newBuilder() + .conf(new DistributedLogConfiguration()) + .uri(new URI("dist://invalid/scheme/in/uri")) + .build(); + } + + @Test(timeout = 60000, expected = IllegalArgumentException.class) + public void testInvalidSchemeCorrectBackendInUri() throws Exception { + NamespaceBuilder.newBuilder() + .conf(new DistributedLogConfiguration()) + .uri(new URI("dist-bk://invalid/scheme/in/uri")) + .build(); + } + + @Test(timeout = 60000, expected = IllegalArgumentException.class) + public void testUnknownBackendInUri() throws Exception { + NamespaceBuilder.newBuilder() + .conf(new DistributedLogConfiguration()) + .uri(new URI("distributedlog-unknown://invalid/scheme/in/uri")) + .build(); + } + + @Test(timeout = 60000, expected = NullPointerException.class) + public void testNullStatsLogger() throws Exception { + NamespaceBuilder.newBuilder() + .conf(new DistributedLogConfiguration()) + .uri(new URI("distributedlog-bk://localhost/distributedlog")) + .statsLogger(null) + .build(); + } + + @Test(timeout = 60000, expected = NullPointerException.class) + public void testNullClientId() throws Exception { + NamespaceBuilder.newBuilder() + .conf(new DistributedLogConfiguration()) + .uri(new URI("distributedlog-bk://localhost/distributedlog")) + .clientId(null) + .build(); + } + + @Test(timeout = 60000) + public void testBuildBKDistributedLogNamespace() throws Exception { + Namespace namespace = NamespaceBuilder.newBuilder() + .conf(new DistributedLogConfiguration()) + .uri(new URI("distributedlog-bk://" + zkServers + DLOG_NAMESPACE + "/bknamespace")) + .build(); + try { + assertTrue("distributedlog-bk:// should build bookkeeper based distributedlog namespace", + namespace instanceof BKDistributedLogNamespace); + } finally { + namespace.close(); + } + } + + @Test(timeout = 60000) + public void testBuildWhenMissingBackendInUri() throws Exception { + Namespace namespace = NamespaceBuilder.newBuilder() + .conf(new DistributedLogConfiguration()) + .uri(new URI("distributedlog://" + zkServers + DLOG_NAMESPACE + "/defaultnamespace")) + .build(); + try { + assertTrue("distributedlog:// should build bookkeeper based distributedlog namespace", + namespace instanceof BKDistributedLogNamespace); + } finally { + namespace.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/rate/TestMovingAverageRate.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/rate/TestMovingAverageRate.java b/distributedlog-core/src/test/java/org/apache/distributedlog/rate/TestMovingAverageRate.java deleted file mode 100644 index 8949bec..0000000 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/rate/TestMovingAverageRate.java +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.rate; - -import com.twitter.util.Duration; -import com.twitter.util.Function; -import com.twitter.util.MockTimer; -import com.twitter.util.Time$; -import com.twitter.util.TimeControl; - -import org.junit.Test; -import scala.runtime.BoxedUnit; - -import static org.junit.Assert.*; - -public class TestMovingAverageRate { - interface TcCallback { - void apply(TimeControl tc); - } - - void withCurrentTimeFrozen(final TcCallback cb) { - Time$.MODULE$.withCurrentTimeFrozen(new Function<TimeControl, BoxedUnit>() { - @Override - public BoxedUnit apply(TimeControl time) { - cb.apply(time); - return BoxedUnit.UNIT; - } - }); - } - - private void advance(TimeControl time, MockTimer timer, int timeMs) { - Duration duration = Duration.fromMilliseconds(timeMs); - time.advance(duration); - timer.tick(); - } - - @Test(timeout = 60000) - public void testNoChangeInUnderMinInterval() { - withCurrentTimeFrozen(new TcCallback() { - @Override - public void apply(TimeControl time) { - MockTimer timer = new MockTimer(); - MovingAverageRateFactory factory = new MovingAverageRateFactory(timer); - MovingAverageRate avg60 = factory.create(60); - avg60.add(1000); - assertEquals(0, avg60.get(), 0); - advance(time, timer, 1); - assertEquals(0, avg60.get(), 0); - advance(time, timer, 1); - assertEquals(0, avg60.get(), 0); - } - }); - } - - @Test(timeout = 60000) - public void testFactoryWithMultipleTimers() { - withCurrentTimeFrozen(new TcCallback() { - @Override - public void apply(TimeControl time) { - MockTimer timer = new MockTimer(); - MovingAverageRateFactory factory = new MovingAverageRateFactory(timer); - MovingAverageRate avg60 = factory.create(60); - MovingAverageRate avg30 = factory.create(30); - - // Can't test this precisely because the Rate class uses its own - // ticker. So we can control when it gets sampled but not the time - // value it uses. So, just do basic validation. - for (int i = 0; i < 30; i++) { - avg60.add(100); - avg30.add(100); - advance(time, timer, 1000); - } - double s1 = avg60.get(); - assertTrue(avg30.get() > 0); - for (int i = 0; i < 30; i++) { - advance(time, timer, 1000); - } - assertTrue(avg60.get() > 0); - assertTrue(avg60.get() < s1); - assertEquals(0.0, avg30.get(), 0); - } - }); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/tools/TestDistributedLogTool.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/tools/TestDistributedLogTool.java b/distributedlog-core/src/test/java/org/apache/distributedlog/tools/TestDistributedLogTool.java index 47e2fae..71bf68d 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/tools/TestDistributedLogTool.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/tools/TestDistributedLogTool.java @@ -22,11 +22,11 @@ import java.net.URI; import org.apache.distributedlog.DLMTestUtil; import org.apache.distributedlog.DLSN; import org.apache.distributedlog.DistributedLogConfiguration; -import org.apache.distributedlog.DistributedLogManager; +import org.apache.distributedlog.api.DistributedLogManager; import org.apache.distributedlog.TestDistributedLogBase; import org.apache.distributedlog.LocalDLMEmulator; import org.apache.distributedlog.LogRecordWithDLSN; -import org.apache.distributedlog.LogReader; +import org.apache.distributedlog.api.LogReader; import org.apache.distributedlog.exceptions.ZKException; import org.apache.distributedlog.tools.DistributedLogTool.*; import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestFutureUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestFutureUtils.java b/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestFutureUtils.java deleted file mode 100644 index f9e4eb8..0000000 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestFutureUtils.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.util; - -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; -import org.junit.Test; - -import java.io.IOException; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.fail; - -/** - * Test Case for {@link FutureUtils} - */ -public class TestFutureUtils { - - static class TestException extends IOException { - } - - @Test(timeout = 60000) - public void testWithin() throws Exception { - OrderedScheduler scheduler = OrderedScheduler.newBuilder() - .corePoolSize(1) - .name("test-within") - .build(); - final Promise<Void> promiseToTimeout = new Promise<Void>(); - final Promise<Void> finalPromise = new Promise<Void>(); - FutureUtils.within( - promiseToTimeout, - 10, - TimeUnit.MILLISECONDS, - new TestException(), - scheduler, - "test-within" - ).addEventListener(new FutureEventListener<Void>() { - @Override - public void onFailure(Throwable cause) { - FutureUtils.setException(finalPromise, cause); - } - - @Override - public void onSuccess(Void value) { - FutureUtils.setValue(finalPromise, value); - } - }); - try { - FutureUtils.result(finalPromise); - fail("Should fail with TestException"); - } catch (TestException te) { - // expected - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestPermitManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestPermitManager.java b/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestPermitManager.java index 802649d..807ce02 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestPermitManager.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestPermitManager.java @@ -17,6 +17,7 @@ */ package org.apache.distributedlog.util; +import org.apache.distributedlog.common.util.PermitManager; import org.apache.distributedlog.zk.LimitedPermitManager; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestSafeQueueingFuturePool.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestSafeQueueingFuturePool.java b/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestSafeQueueingFuturePool.java deleted file mode 100644 index 7bfe5ed..0000000 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestSafeQueueingFuturePool.java +++ /dev/null @@ -1,205 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.util; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.twitter.util.Await; -import com.twitter.util.Duration; -import com.twitter.util.Function0; -import com.twitter.util.FuturePool; -import com.twitter.util.ExecutorServiceFuturePool; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Future; - -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; - -import java.util.ArrayList; -import java.util.List; - -import static com.google.common.base.Charsets.UTF_8; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.fail; - -import scala.runtime.BoxedUnit; - -public class TestSafeQueueingFuturePool { - static final Logger LOG = LoggerFactory.getLogger(TestSafeQueueingFuturePool.class); - - @Rule - public TestName runtime = new TestName(); - - class TestFuturePool<T> { - final ScheduledExecutorService executor; - final FuturePool pool; - final SafeQueueingFuturePool<T> wrapper; - TestFuturePool() { - executor = Executors.newScheduledThreadPool(1); - pool = new ExecutorServiceFuturePool(executor); - wrapper = new SafeQueueingFuturePool<T>(pool); - } - public void shutdown() { - executor.shutdown(); - } - } - - @Test(timeout = 60000) - public void testSimpleSuccess() throws Exception { - TestFuturePool<Void> pool = new TestFuturePool<Void>(); - final AtomicBoolean result = new AtomicBoolean(false); - Future<Void> future = pool.wrapper.apply(new Function0<Void>() { - public Void apply() { - result.set(true); - return null; - } - }); - Await.result(future); - assertTrue(result.get()); - pool.shutdown(); - } - - @Test(timeout = 60000) - public void testSimpleFailure() throws Exception { - TestFuturePool<Void> pool = new TestFuturePool<Void>(); - Future<Void> future = pool.wrapper.apply(new Function0<Void>() { - public Void apply() { - throw new RuntimeException("failed"); - } - }); - try { - Await.result(future); - fail("should have thrown"); - } catch (Exception ex) { - } - pool.shutdown(); - } - - @Test(timeout = 60000) - public void testFailedDueToClosed() throws Exception { - TestFuturePool<Void> pool = new TestFuturePool<Void>(); - pool.wrapper.close(); - Future<Void> future = pool.wrapper.apply(new Function0<Void>() { - public Void apply() { - throw new RuntimeException("failed"); - } - }); - try { - Await.result(future); - fail("should have thrown"); - } catch (RejectedExecutionException ex) { - } - pool.shutdown(); - } - - @Test(timeout = 60000) - public void testRejectedFailure() throws Exception { - TestFuturePool<Void> pool = new TestFuturePool<Void>(); - final AtomicBoolean result = new AtomicBoolean(false); - pool.executor.shutdown(); - final CountDownLatch latch = new CountDownLatch(1); - Future<Void> future = pool.wrapper.apply(new Function0<Void>() { - public Void apply() { - result.set(true); - latch.countDown(); - return null; - } - }); - try { - Await.result(future); - fail("should have thrown"); - } catch (RejectedExecutionException ex) { - } - assertFalse(result.get()); - pool.wrapper.close(); - latch.await(); - assertTrue(result.get()); - pool.shutdown(); - } - - @Test(timeout = 60000) - public void testRejectedBackupFailure() throws Exception { - TestFuturePool<Void> pool = new TestFuturePool<Void>(); - final AtomicBoolean result = new AtomicBoolean(false); - pool.executor.shutdownNow(); - final CountDownLatch latch1 = new CountDownLatch(1); - final CountDownLatch latch2 = new CountDownLatch(1); - Future<Void> future1 = pool.wrapper.apply(new Function0<Void>() { - public Void apply() { - try { - latch1.await(); - } catch (Exception ex) { - } - return null; - } - }); - - // Enqueue a set of futures behind. - final int blockedCount = 100; - final ArrayList<Future<Void>> blockedFutures = new ArrayList<Future<Void>>(blockedCount); - final int[] doneArray = new int[blockedCount]; - final AtomicInteger doneCount = new AtomicInteger(0); - for (int i = 0; i < blockedCount; i++) { - final int index = i; - blockedFutures.add(pool.wrapper.apply(new Function0<Void>() { - public Void apply() { - doneArray[index] = doneCount.getAndIncrement(); - return null; - } - })); - } - - // All the futures fail when the executor is force closed. - latch1.countDown(); - pool.executor.shutdownNow(); - for (int i = 0; i < blockedCount; i++) { - try { - Await.result(blockedFutures.get(i)); - fail("should have thrown"); - } catch (RejectedExecutionException ex) { - } - } - - // None of them have completed. - for (int i = 0; i < blockedCount; i++) { - assertEquals(0, doneArray[i]); - } - - // Close cleans up all pending ops in order. - pool.wrapper.close(); - for (int i = 0; i < blockedCount; i++) { - assertEquals(i, doneArray[i]); - } - - pool.shutdown(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestUtils.java b/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestUtils.java index a9db6e0..acd441c 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestUtils.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestUtils.java @@ -108,15 +108,15 @@ public class TestUtils extends ZooKeeperClusterTestCase { @Test(timeout = 60000) public void testZkGetData() throws Exception { String path1 = "/zk-get-data/non-existent-path"; - Versioned<byte[]> data = FutureUtils.result(Utils.zkGetData(zkc.get(), path1, false)); + Versioned<byte[]> data = Utils.ioResult(Utils.zkGetData(zkc.get(), path1, false)); assertNull("No data should return from non-existent-path", data.getValue()); assertNull("No version should return from non-existent-path", data.getVersion()); String path2 = "/zk-get-data/path2"; byte[] rawData = "test-data".getBytes(UTF_8); - FutureUtils.result(Utils.zkAsyncCreateFullPathOptimistic(zkc, path2, rawData, + Utils.ioResult(Utils.zkAsyncCreateFullPathOptimistic(zkc, path2, rawData, zkc.getDefaultACL(), CreateMode.PERSISTENT)); - data = FutureUtils.result(Utils.zkGetData(zkc.get(), path2, false)); + data = Utils.ioResult(Utils.zkGetData(zkc.get(), path2, false)); assertArrayEquals("Data should return as written", rawData, data.getValue()); assertEquals("Version should be zero", http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-protocol/pom.xml ---------------------------------------------------------------------- diff --git a/distributedlog-protocol/pom.xml b/distributedlog-protocol/pom.xml index a483444..7197feb 100644 --- a/distributedlog-protocol/pom.xml +++ b/distributedlog-protocol/pom.xml @@ -26,35 +26,15 @@ <name>Apache DistributedLog :: Protocol</name> <dependencies> <dependency> - <groupId>org.apache.bookkeeper.stats</groupId> - <artifactId>bookkeeper-stats-api</artifactId> - <version>${bookkeeper.version}</version> - <exclusions> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - </exclusions> + <groupId>org.apache.distributedlog</groupId> + <artifactId>distributedlog-common</artifactId> + <version>${project.version}</version> </dependency> <dependency> - <groupId>commons-lang</groupId> - <artifactId>commons-lang</artifactId> - <version>${commons-lang.version}</version> - </dependency> - <dependency> - <groupId>commons-codec</groupId> - <artifactId>commons-codec</artifactId> - <version>${commons-codec.version}</version> - </dependency> - <dependency> - <groupId>com.twitter</groupId> - <artifactId>finagle-core_2.11</artifactId> - <version>${finagle.version}</version> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - <version>${slf4j.version}</version> + <groupId>org.projectlombok</groupId> + <artifactId>lombok</artifactId> + <version>${lombok.version}</version> + <scope>provided</scope> </dependency> <dependency> <groupId>net.jpountz.lz4</groupId> @@ -67,6 +47,18 @@ <version>${junit.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <version>${mockito.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>${slf4j.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> <plugins> http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java index 9d2d7a7..2a60ff3 100644 --- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java +++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java @@ -26,12 +26,6 @@ import static org.apache.distributedlog.LogRecordSet.METADATA_VERSION_MASK; import static org.apache.distributedlog.LogRecordSet.NULL_OP_STATS_LOGGER; import static org.apache.distributedlog.LogRecordSet.VERSION; -import org.apache.distributedlog.exceptions.LogRecordTooLongException; -import org.apache.distributedlog.exceptions.WriteException; -import org.apache.distributedlog.io.Buffer; -import org.apache.distributedlog.io.CompressionCodec; -import org.apache.distributedlog.io.CompressionUtils; -import com.twitter.util.Promise; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -39,6 +33,12 @@ import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.CompletableFuture; +import org.apache.distributedlog.exceptions.LogRecordTooLongException; +import org.apache.distributedlog.exceptions.WriteException; +import org.apache.distributedlog.io.Buffer; +import org.apache.distributedlog.io.CompressionCodec; +import org.apache.distributedlog.io.CompressionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +52,7 @@ class EnvelopedRecordSetWriter implements LogRecordSet.Writer { private final Buffer buffer; private final DataOutputStream writer; private final WritableByteChannel writeChannel; - private final List<Promise<DLSN>> promiseList; + private final List<CompletableFuture<DLSN>> promiseList; private final CompressionCodec.Type codec; private final int codecCode; private int count = 0; @@ -61,7 +61,7 @@ class EnvelopedRecordSetWriter implements LogRecordSet.Writer { EnvelopedRecordSetWriter(int initialBufferSize, CompressionCodec.Type codec) { this.buffer = new Buffer(Math.max(initialBufferSize, HEADER_LEN)); - this.promiseList = new LinkedList<Promise<DLSN>>(); + this.promiseList = new LinkedList<CompletableFuture<DLSN>>(); this.codec = codec; switch (codec) { case LZ4: @@ -84,13 +84,13 @@ class EnvelopedRecordSetWriter implements LogRecordSet.Writer { this.writeChannel = Channels.newChannel(writer); } - synchronized List<Promise<DLSN>> getPromiseList() { + synchronized List<CompletableFuture<DLSN>> getPromiseList() { return promiseList; } @Override public synchronized void writeRecord(ByteBuffer record, - Promise<DLSN> transmitPromise) + CompletableFuture<DLSN> transmitPromise) throws LogRecordTooLongException, WriteException { int logRecordSize = record.remaining(); if (logRecordSize > MAX_LOGRECORD_SIZE) { @@ -111,16 +111,16 @@ class EnvelopedRecordSetWriter implements LogRecordSet.Writer { private synchronized void satisfyPromises(long lssn, long entryId, long startSlotId) { long nextSlotId = startSlotId; - for (Promise<DLSN> promise : promiseList) { - promise.setValue(new DLSN(lssn, entryId, nextSlotId)); + for (CompletableFuture<DLSN> promise : promiseList) { + promise.complete(new DLSN(lssn, entryId, nextSlotId)); nextSlotId++; } promiseList.clear(); } private synchronized void cancelPromises(Throwable reason) { - for (Promise<DLSN> promise : promiseList) { - promise.setException(reason); + for (CompletableFuture<DLSN> promise : promiseList) { + promise.completeExceptionally(reason); } promiseList.clear(); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordSet.java ---------------------------------------------------------------------- diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordSet.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordSet.java index 375ed3f..55b20ff 100644 --- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordSet.java +++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordSet.java @@ -19,15 +19,15 @@ package org.apache.distributedlog; import static com.google.common.base.Preconditions.checkArgument; -import org.apache.distributedlog.exceptions.LogRecordTooLongException; -import org.apache.distributedlog.exceptions.WriteException; -import org.apache.distributedlog.io.CompressionCodec; -import com.twitter.util.Promise; import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.distributedlog.exceptions.LogRecordTooLongException; +import org.apache.distributedlog.exceptions.WriteException; +import org.apache.distributedlog.io.CompressionCodec; /** * A set of {@link LogRecord}s. @@ -134,7 +134,7 @@ public class LogRecordSet { * @throws LogRecordTooLongException if the record is too long * @throws WriteException when encountered exception writing the record */ - void writeRecord(ByteBuffer record, Promise<DLSN> transmitPromise) + void writeRecord(ByteBuffer record, CompletableFuture<DLSN> transmitPromise) throws LogRecordTooLongException, WriteException; } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-protocol/src/main/java/org/apache/distributedlog/annotations/DistributedLogAnnotations.java ---------------------------------------------------------------------- diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/annotations/DistributedLogAnnotations.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/annotations/DistributedLogAnnotations.java deleted file mode 100644 index f8cdea4..0000000 --- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/annotations/DistributedLogAnnotations.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.annotations; - -/** - * Common annotation types. - */ -public class DistributedLogAnnotations { - /** - * Annotation to identify flaky tests in DistributedLog. - * As and when we find that a test is flaky, we'll add this annotation to it for reference. - */ - public @interface FlakyTest {} - - /** - * Annotation to specify the occurrence of a compression operation. These are CPU intensive - * and should be avoided in low-latency paths. - */ - public @interface Compression {} -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-protocol/src/main/java/org/apache/distributedlog/annotations/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/annotations/package-info.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/annotations/package-info.java deleted file mode 100644 index 0922f14..0000000 --- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/annotations/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Defines annotations used across distributedlog project. - */ -package org.apache.distributedlog.annotations; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-protocol/src/main/java/org/apache/distributedlog/util/BitMaskUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/util/BitMaskUtils.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/util/BitMaskUtils.java deleted file mode 100644 index b5280c9..0000000 --- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/util/BitMaskUtils.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.util; - -import static com.google.common.base.Preconditions.checkArgument; - -/** - * Utils for bit mask operations. - */ -public class BitMaskUtils { - - /** - * 1) Unset all bits where value in mask is set. - * 2) Set these bits to value specified by newValue. - * - * <p>e.g. - * if oldValue = 1010, mask = 0011, newValue = 0001 - * 1) 1010 -> 1000 - * 2) 1000 -> 1001 - * - * @param oldValue expected old value - * @param mask the mask of the value for updates - * @param newValue new value to set - * @return updated value - */ - public static long set(long oldValue, long mask, long newValue) { - checkArgument(oldValue >= 0L && mask >= 0L && newValue >= 0L); - return ((oldValue & (~mask)) | (newValue & mask)); - } - - /** - * Get the bits where mask is 1. - * - * @param value value - * @param mask mask of the value - * @return the bit of the mask - */ - public static long get(long value, long mask) { - checkArgument(value >= 0L && mask >= 0L); - return (value & mask); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-protocol/src/main/java/org/apache/distributedlog/util/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/util/package-info.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/util/package-info.java deleted file mode 100644 index ee17950..0000000 --- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/util/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * defines the utilities used across the project. - */ -package org.apache.distributedlog.util; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestLogRecordSet.java ---------------------------------------------------------------------- diff --git a/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestLogRecordSet.java b/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestLogRecordSet.java index 95e03ab..1c5db24 100644 --- a/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestLogRecordSet.java +++ b/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestLogRecordSet.java @@ -25,15 +25,14 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.fail; import com.google.common.collect.Lists; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.CompletableFuture; import org.apache.distributedlog.LogRecordSet.Reader; import org.apache.distributedlog.LogRecordSet.Writer; import org.apache.distributedlog.exceptions.LogRecordTooLongException; import org.apache.distributedlog.io.CompressionCodec.Type; -import com.twitter.util.Await; -import com.twitter.util.Future; -import com.twitter.util.Promise; -import java.nio.ByteBuffer; -import java.util.List; +import org.apache.distributedlog.common.concurrent.FutureUtils; import org.junit.Test; /** @@ -72,7 +71,7 @@ public class TestLogRecordSet { ByteBuffer dataBuf = ByteBuffer.allocate(MAX_LOGRECORD_SIZE + 1); try { - writer.writeRecord(dataBuf, new Promise<DLSN>()); + writer.writeRecord(dataBuf, new CompletableFuture<DLSN>()); fail("Should fail on writing large record"); } catch (LogRecordTooLongException lrtle) { // expected @@ -111,18 +110,18 @@ public class TestLogRecordSet { assertEquals("zero user bytes", HEADER_LEN, writer.getNumBytes()); assertEquals("zero records", 0, writer.getNumRecords()); - List<Future<DLSN>> writePromiseList = Lists.newArrayList(); + List<CompletableFuture<DLSN>> writePromiseList = Lists.newArrayList(); /// write first 5 records for (int i = 0; i < 5; i++) { ByteBuffer record = ByteBuffer.wrap(("record-" + i).getBytes(UTF_8)); - Promise<DLSN> writePromise = new Promise<DLSN>(); + CompletableFuture<DLSN> writePromise = new CompletableFuture<>(); writer.writeRecord(record, writePromise); writePromiseList.add(writePromise); assertEquals((i + 1) + " records", (i + 1), writer.getNumRecords()); } ByteBuffer dataBuf = ByteBuffer.allocate(MAX_LOGRECORD_SIZE + 1); try { - writer.writeRecord(dataBuf, new Promise<DLSN>()); + writer.writeRecord(dataBuf, new CompletableFuture<>()); fail("Should fail on writing large record"); } catch (LogRecordTooLongException lrtle) { // expected @@ -132,7 +131,7 @@ public class TestLogRecordSet { /// write another 5 records for (int i = 0; i < 5; i++) { ByteBuffer record = ByteBuffer.wrap(("record-" + (i + 5)).getBytes(UTF_8)); - Promise<DLSN> writePromise = new Promise<DLSN>(); + CompletableFuture<DLSN> writePromise = new CompletableFuture<>(); writer.writeRecord(record, writePromise); writePromiseList.add(writePromise); assertEquals((i + 6) + " records", (i + 6), writer.getNumRecords()); @@ -143,7 +142,7 @@ public class TestLogRecordSet { // Test transmit complete writer.completeTransmit(1L, 1L, 10L); - List<DLSN> writeResults = Await.result(Future.collect(writePromiseList)); + List<DLSN> writeResults = FutureUtils.result(FutureUtils.collect(writePromiseList)); for (int i = 0; i < 10; i++) { assertEquals(new DLSN(1L, 1L, 10L + i), writeResults.get(i)); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestTimedOutTestsListener.java ---------------------------------------------------------------------- diff --git a/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestTimedOutTestsListener.java b/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestTimedOutTestsListener.java deleted file mode 100644 index 20cf53c..0000000 --- a/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestTimedOutTestsListener.java +++ /dev/null @@ -1,183 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog; - -import java.io.PrintWriter; -import java.io.StringWriter; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.notification.Failure; - -/** - * Test Case for {@link TimedOutTestsListener}. - */ -public class TestTimedOutTestsListener { - - private static class Deadlock { - private CyclicBarrier barrier = new CyclicBarrier(6); - - public Deadlock() { - DeadlockThread[] dThreads = new DeadlockThread[6]; - - Monitor a = new Monitor("a"); - Monitor b = new Monitor("b"); - Monitor c = new Monitor("c"); - dThreads[0] = new DeadlockThread("MThread-1", a, b); - dThreads[1] = new DeadlockThread("MThread-2", b, c); - dThreads[2] = new DeadlockThread("MThread-3", c, a); - - Lock d = new ReentrantLock(); - Lock e = new ReentrantLock(); - Lock f = new ReentrantLock(); - - dThreads[3] = new DeadlockThread("SThread-4", d, e); - dThreads[4] = new DeadlockThread("SThread-5", e, f); - dThreads[5] = new DeadlockThread("SThread-6", f, d); - - // make them daemon threads so that the test will exit - for (int i = 0; i < 6; i++) { - dThreads[i].setDaemon(true); - dThreads[i].start(); - } - } - - class DeadlockThread extends Thread { - private Lock lock1 = null; - - private Lock lock2 = null; - - private Monitor mon1 = null; - - private Monitor mon2 = null; - - private boolean useSync; - - DeadlockThread(String name, Lock lock1, Lock lock2) { - super(name); - this.lock1 = lock1; - this.lock2 = lock2; - this.useSync = true; - } - - DeadlockThread(String name, Monitor mon1, Monitor mon2) { - super(name); - this.mon1 = mon1; - this.mon2 = mon2; - this.useSync = false; - } - - public void run() { - if (useSync) { - syncLock(); - } else { - monitorLock(); - } - } - - private void syncLock() { - lock1.lock(); - try { - try { - barrier.await(); - } catch (Exception e) { - } - goSyncDeadlock(); - } finally { - lock1.unlock(); - } - } - - private void goSyncDeadlock() { - try { - barrier.await(); - } catch (Exception e) { - } - lock2.lock(); - throw new RuntimeException("should not reach here."); - } - - private void monitorLock() { - synchronized (mon1) { - try { - barrier.await(); - } catch (Exception e) { - } - goMonitorDeadlock(); - } - } - - private void goMonitorDeadlock() { - try { - barrier.await(); - } catch (Exception e) { - } - synchronized (mon2) { - throw new RuntimeException(getName() + " should not reach here."); - } - } - } - - class Monitor { - String name; - - Monitor(String name) { - this.name = name; - } - } - - } - - @Test(timeout = 500) - public void testThreadDumpAndDeadlocks() throws Exception { - new Deadlock(); - String s = null; - while (true) { - s = TimedOutTestsListener.buildDeadlockInfo(); - if (s != null) { - break; - } - Thread.sleep(100); - } - - Assert.assertEquals(3, countStringOccurrences(s, "BLOCKED")); - - Failure failure = new Failure(null, new Exception(TimedOutTestsListener.TEST_TIMED_OUT_PREFIX)); - StringWriter writer = new StringWriter(); - new TimedOutTestsListener(new PrintWriter(writer)).testFailure(failure); - String out = writer.toString(); - - Assert.assertTrue(out.contains("THREAD DUMP")); - Assert.assertTrue(out.contains("DEADLOCKS DETECTED")); - - System.out.println(out); - } - - private int countStringOccurrences(String s, String substr) { - int n = 0; - int index = 0; - while ((index = s.indexOf(substr, index) + 1) != 0) { - n++; - } - return n; - } - -}