http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestSequenceID.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestSequenceID.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestSequenceID.java index 9258922..da4ef81 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestSequenceID.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestSequenceID.java @@ -18,8 +18,9 @@ package org.apache.distributedlog; import org.apache.distributedlog.LogSegmentMetadata.LogSegmentMetadataVersion; -import com.twitter.util.Await; -import com.twitter.util.FutureEventListener; +import org.apache.distributedlog.api.AsyncLogReader; +import org.apache.distributedlog.common.concurrent.FutureEventListener; +import org.apache.distributedlog.util.Utils; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,7 +73,7 @@ public class TestSequenceID extends TestDistributedLogBase { BKDistributedLogManager dlm = (BKDistributedLogManager) createNewDLM(confLocal, name); BKAsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned(); - Await.result(writer.write(DLMTestUtil.getLogRecordInstance(0L))); + Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(0L))); dlm.close(); @@ -126,16 +127,16 @@ public class TestSequenceID extends TestDistributedLogBase { for (int i = 0; i < 3; i++) { BKAsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned(); for (int j = 0; j < 2; j++) { - Await.result(writer.write(DLMTestUtil.getLogRecordInstance(txId++))); + Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txId++))); if (null == reader) { reader = readDLM.getAsyncLogReader(DLSN.InitialDLSN); final AsyncLogReader r = reader; - reader.readNext().addEventListener(new FutureEventListener<LogRecordWithDLSN>() { + reader.readNext().whenComplete(new FutureEventListener<LogRecordWithDLSN>() { @Override public void onSuccess(LogRecordWithDLSN record) { readRecords.add(record); - r.readNext().addEventListener(this); + r.readNext().whenComplete(this); } @Override @@ -149,7 +150,7 @@ public class TestSequenceID extends TestDistributedLogBase { } BKAsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned(); - Await.result(writer.write(DLMTestUtil.getLogRecordInstance(txId++))); + Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txId++))); List<LogSegmentMetadata> segments = dlm.getLogSegments(); assertEquals(4, segments.size()); @@ -174,12 +175,12 @@ public class TestSequenceID extends TestDistributedLogBase { for (int i = 0; i < 3; i++) { BKAsyncLogWriter writerv5 = dlmv5.startAsyncLogSegmentNonPartitioned(); for (int j = 0; j < 2; j++) { - Await.result(writerv5.write(DLMTestUtil.getLogRecordInstance(txId++))); + Utils.ioResult(writerv5.write(DLMTestUtil.getLogRecordInstance(txId++))); } writerv5.closeAndComplete(); } BKAsyncLogWriter writerv5 = dlmv5.startAsyncLogSegmentNonPartitioned(); - Await.result(writerv5.write(DLMTestUtil.getLogRecordInstance(txId++))); + Utils.ioResult(writerv5.write(DLMTestUtil.getLogRecordInstance(txId++))); List<LogSegmentMetadata> segmentsv5 = dlmv5.getLogSegments(); assertEquals(8, segmentsv5.size()); @@ -205,7 +206,7 @@ public class TestSequenceID extends TestDistributedLogBase { for (int i = 0; i < 3; i++) { BKAsyncLogWriter writerv4 = dlmv4.startAsyncLogSegmentNonPartitioned(); for (int j = 0; j < 2; j++) { - Await.result(writerv4.write(DLMTestUtil.getLogRecordInstance(txId++))); + Utils.ioResult(writerv4.write(DLMTestUtil.getLogRecordInstance(txId++))); } writerv4.closeAndComplete(); }
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestTruncate.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestTruncate.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestTruncate.java index 5b26a70..06708c8 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestTruncate.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestTruncate.java @@ -22,7 +22,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.api.AsyncLogWriter; +import org.apache.distributedlog.api.DistributedLogManager; +import org.apache.distributedlog.api.LogReader; import org.apache.distributedlog.util.Utils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; @@ -31,7 +33,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.distributedlog.LogSegmentMetadata.TruncationStatus; -import com.twitter.util.Await; import static org.junit.Assert.*; @@ -96,11 +97,11 @@ public class TestTruncate extends TestDistributedLogBase { AsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned(); long txid = 1 + 10 * 10; for (int j = 1; j <= 10; j++) { - Await.result(writer.write(DLMTestUtil.getLogRecordInstance(txid++))); + Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txid++))); } // to make sure the truncation task is executed - DLSN lastDLSN = Await.result(dlm.getLastDLSNAsync()); + DLSN lastDLSN = Utils.ioResult(dlm.getLastDLSNAsync()); LOG.info("Get last dlsn of stream {} : {}", name, lastDLSN); assertEquals(6, distributedLogManager.getLogSegments().size()); @@ -123,20 +124,20 @@ public class TestTruncate extends TestDistributedLogBase { Thread.sleep(1000); // delete invalid dlsn - assertFalse(Await.result(pair.getRight().truncate(DLSN.InvalidDLSN))); + assertFalse(Utils.ioResult(pair.getRight().truncate(DLSN.InvalidDLSN))); verifyEntries(name, 1, 1, 5 * 10); for (int i = 1; i <= 4; i++) { int txn = (i-1) * 10 + i; DLSN dlsn = txid2DLSN.get((long)txn); - assertTrue(Await.result(pair.getRight().truncate(dlsn))); + assertTrue(Utils.ioResult(pair.getRight().truncate(dlsn))); verifyEntries(name, 1, (i - 1) * 10 + 1, (5 - i + 1) * 10); } // Delete higher dlsn int txn = 43; DLSN dlsn = txid2DLSN.get((long) txn); - assertTrue(Await.result(pair.getRight().truncate(dlsn))); + assertTrue(Utils.ioResult(pair.getRight().truncate(dlsn))); verifyEntries(name, 1, 41, 10); Utils.close(pair.getRight()); @@ -160,14 +161,14 @@ public class TestTruncate extends TestDistributedLogBase { for (int i = 1; i <= 4; i++) { int txn = (i-1) * 10 + i; DLSN dlsn = txid2DLSN.get((long)txn); - assertTrue(Await.result(pair.getRight().truncate(dlsn))); + assertTrue(Utils.ioResult(pair.getRight().truncate(dlsn))); verifyEntries(name, 1, (i - 1) * 10 + 1, (5 - i + 1) * 10); } // Delete higher dlsn int txn = 43; DLSN dlsn = txid2DLSN.get((long) txn); - assertTrue(Await.result(pair.getRight().truncate(dlsn))); + assertTrue(Utils.ioResult(pair.getRight().truncate(dlsn))); verifyEntries(name, 1, 41, 10); Utils.close(pair.getRight()); @@ -176,7 +177,7 @@ public class TestTruncate extends TestDistributedLogBase { // Try force truncation BKDistributedLogManager dlm = (BKDistributedLogManager)createNewDLM(confLocal, name); BKLogWriteHandler handler = dlm.createWriteHandler(true); - FutureUtils.result(handler.purgeLogSegmentsOlderThanTxnId(Integer.MAX_VALUE)); + Utils.ioResult(handler.purgeLogSegmentsOlderThanTxnId(Integer.MAX_VALUE)); verifyEntries(name, 1, 41, 10); } @@ -230,11 +231,11 @@ public class TestTruncate extends TestDistributedLogBase { AsyncLogWriter newWriter = newDLM.startAsyncLogSegmentNonPartitioned(); long txid = 1 + 4 * 10; for (int j = 1; j <= 10; j++) { - Await.result(newWriter.write(DLMTestUtil.getLogRecordInstance(txid++))); + Utils.ioResult(newWriter.write(DLMTestUtil.getLogRecordInstance(txid++))); } // to make sure the truncation task is executed - DLSN lastDLSN = Await.result(newDLM.getLastDLSNAsync()); + DLSN lastDLSN = Utils.ioResult(newDLM.getLastDLSNAsync()); LOG.info("Get last dlsn of stream {} : {}", name, lastDLSN); assertEquals(5, newDLM.getLogSegments().size()); @@ -277,7 +278,7 @@ public class TestTruncate extends TestDistributedLogBase { DistributedLogManager newDLM = createNewDLM(confLocal, name); AsyncLogWriter newWriter = newDLM.startAsyncLogSegmentNonPartitioned(); - Await.result(newWriter.truncate(dlsnMap.get(15L))); + Utils.ioResult(newWriter.truncate(dlsnMap.get(15L))); List<LogSegmentMetadata> newSegments2 = newDLM.getLogSegments(); assertArrayEquals(newSegments.toArray(new LogSegmentMetadata[4]), @@ -299,7 +300,7 @@ public class TestTruncate extends TestDistributedLogBase { AsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned(); for (int j = 1; j <= numEntriesPerLogSegment; j++) { long curTxId = txid++; - DLSN dlsn = Await.result(writer.write(DLMTestUtil.getLogRecordInstance(curTxId))); + DLSN dlsn = Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(curTxId))); txid2DLSN.put(curTxId, dlsn); } Utils.close(writer); @@ -311,7 +312,7 @@ public class TestTruncate extends TestDistributedLogBase { AsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned(); for (int j = 1; j <= 10; j++) { long curTxId = txid++; - DLSN dlsn = Await.result(writer.write(DLMTestUtil.getLogRecordInstance(curTxId))); + DLSN dlsn = Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(curTxId))); txid2DLSN.put(curTxId, dlsn); } return new ImmutablePair<DistributedLogManager, AsyncLogWriter>(dlm, writer); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestWriteLimiter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestWriteLimiter.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestWriteLimiter.java index c28437f..0d0ca99 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestWriteLimiter.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestWriteLimiter.java @@ -18,7 +18,6 @@ package org.apache.distributedlog; import org.apache.distributedlog.exceptions.OverCapacityException; -import org.apache.distributedlog.util.PermitLimiter; import org.apache.distributedlog.util.SimplePermitLimiter; import org.apache.bookkeeper.feature.Feature; import org.apache.bookkeeper.feature.SettableFeature; @@ -30,7 +29,6 @@ import org.slf4j.LoggerFactory; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; -import scala.runtime.BoxedUnit; public class TestWriteLimiter { static final Logger LOG = LoggerFactory.getLogger(TestWriteLimiter.class); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestZooKeeperClient.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestZooKeeperClient.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestZooKeeperClient.java index a1c075f..75bcda2 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestZooKeeperClient.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestZooKeeperClient.java @@ -19,7 +19,7 @@ package org.apache.distributedlog; import org.apache.distributedlog.ZooKeeperClient.Credentials; import org.apache.distributedlog.ZooKeeperClient.DigestCredentials; -import org.apache.distributedlog.annotations.DistributedLogAnnotations; +import org.apache.distributedlog.common.annotations.DistributedLogAnnotations; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; import org.apache.zookeeper.CreateMode; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/acl/TestZKAccessControl.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/acl/TestZKAccessControl.java b/distributedlog-core/src/test/java/org/apache/distributedlog/acl/TestZKAccessControl.java index 8d88a37..45fc1f3 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/acl/TestZKAccessControl.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/acl/TestZKAccessControl.java @@ -17,20 +17,20 @@ */ package org.apache.distributedlog.acl; +import java.net.URI; import org.apache.distributedlog.TestZooKeeperClientBuilder; import org.apache.distributedlog.ZooKeeperClient; import org.apache.distributedlog.ZooKeeperClusterTestCase; import org.apache.distributedlog.impl.acl.ZKAccessControl; import org.apache.distributedlog.thrift.AccessControlEntry; -import com.twitter.util.Await; +import org.apache.distributedlog.common.concurrent.FutureUtils; +import org.apache.distributedlog.util.Utils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.junit.After; import org.junit.Before; import org.junit.Test; -import java.net.URI; - import static com.google.common.base.Charsets.UTF_8; import static org.junit.Assert.*; @@ -60,14 +60,14 @@ public class TestZKAccessControl extends ZooKeeperClusterTestCase { ace.setDenyWrite(true); String zkPath = "/create-zk-access-control"; ZKAccessControl zkac = new ZKAccessControl(ace, zkPath); - Await.result(zkac.create(zkc)); + Utils.ioResult(zkac.create(zkc)); - ZKAccessControl readZKAC = Await.result(ZKAccessControl.read(zkc, zkPath, null)); + ZKAccessControl readZKAC = Utils.ioResult(ZKAccessControl.read(zkc, zkPath, null)); assertEquals(zkac, readZKAC); ZKAccessControl another = new ZKAccessControl(ace, zkPath); try { - Await.result(another.create(zkc)); + FutureUtils.result(another.create(zkc)); } catch (KeeperException.NodeExistsException ke) { // expected } @@ -81,19 +81,19 @@ public class TestZKAccessControl extends ZooKeeperClusterTestCase { ace.setDenyDelete(true); ZKAccessControl zkac = new ZKAccessControl(ace, zkPath); - Await.result(zkac.create(zkc)); + Utils.ioResult(zkac.create(zkc)); - ZKAccessControl readZKAC = Await.result(ZKAccessControl.read(zkc, zkPath, null)); + ZKAccessControl readZKAC = Utils.ioResult(ZKAccessControl.read(zkc, zkPath, null)); assertEquals(zkac, readZKAC); - Await.result(ZKAccessControl.delete(zkc, zkPath)); + Utils.ioResult(ZKAccessControl.delete(zkc, zkPath)); try { - Await.result(ZKAccessControl.read(zkc, zkPath, null)); + FutureUtils.result(ZKAccessControl.read(zkc, zkPath, null)); } catch (KeeperException.NoNodeException nne) { // expected. } - Await.result(ZKAccessControl.delete(zkc, zkPath)); + Utils.ioResult(ZKAccessControl.delete(zkc, zkPath)); } @Test(timeout = 60000) @@ -102,7 +102,7 @@ public class TestZKAccessControl extends ZooKeeperClusterTestCase { zkc.get().create(zkPath, new byte[0], zkc.getDefaultACL(), CreateMode.PERSISTENT); - ZKAccessControl readZKAC = Await.result(ZKAccessControl.read(zkc, zkPath, null)); + ZKAccessControl readZKAC = Utils.ioResult(ZKAccessControl.read(zkc, zkPath, null)); assertEquals(zkPath, readZKAC.getZKPath()); assertEquals(ZKAccessControl.DEFAULT_ACCESS_CONTROL_ENTRY, readZKAC.getAccessControlEntry()); @@ -116,7 +116,7 @@ public class TestZKAccessControl extends ZooKeeperClusterTestCase { zkc.get().create(zkPath, "corrupted-data".getBytes(UTF_8), zkc.getDefaultACL(), CreateMode.PERSISTENT); try { - Await.result(ZKAccessControl.read(zkc, zkPath, null)); + Utils.ioResult(ZKAccessControl.read(zkc, zkPath, null)); } catch (ZKAccessControl.CorruptedAccessControlException cace) { // expected } @@ -130,25 +130,25 @@ public class TestZKAccessControl extends ZooKeeperClusterTestCase { ace.setDenyDelete(true); ZKAccessControl zkac = new ZKAccessControl(ace, zkPath); - Await.result(zkac.create(zkc)); + Utils.ioResult(zkac.create(zkc)); - ZKAccessControl readZKAC = Await.result(ZKAccessControl.read(zkc, zkPath, null)); + ZKAccessControl readZKAC = Utils.ioResult(ZKAccessControl.read(zkc, zkPath, null)); assertEquals(zkac, readZKAC); ace.setDenyRelease(true); ZKAccessControl newZKAC = new ZKAccessControl(ace, zkPath); - Await.result(newZKAC.update(zkc)); - ZKAccessControl readZKAC2 = Await.result(ZKAccessControl.read(zkc, zkPath, null)); + Utils.ioResult(newZKAC.update(zkc)); + ZKAccessControl readZKAC2 = Utils.ioResult(ZKAccessControl.read(zkc, zkPath, null)); assertEquals(newZKAC, readZKAC2); try { - Await.result(readZKAC.update(zkc)); + FutureUtils.result(readZKAC.update(zkc)); } catch (KeeperException.BadVersionException bve) { // expected } readZKAC2.getAccessControlEntry().setDenyTruncate(true); - Await.result(readZKAC2.update(zkc)); - ZKAccessControl readZKAC3 = Await.result(ZKAccessControl.read(zkc, zkPath, null)); + Utils.ioResult(readZKAC2.update(zkc)); + ZKAccessControl readZKAC3 = Utils.ioResult(ZKAccessControl.read(zkc, zkPath, null)); assertEquals(readZKAC2, readZKAC3); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/acl/TestZKAccessControlManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/acl/TestZKAccessControlManager.java b/distributedlog-core/src/test/java/org/apache/distributedlog/acl/TestZKAccessControlManager.java index 19c301b..868549e 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/acl/TestZKAccessControlManager.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/acl/TestZKAccessControlManager.java @@ -25,7 +25,7 @@ import org.apache.distributedlog.ZooKeeperClusterTestCase; import org.apache.distributedlog.impl.acl.ZKAccessControl; import org.apache.distributedlog.impl.acl.ZKAccessControlManager; import org.apache.distributedlog.thrift.AccessControlEntry; -import com.twitter.util.Await; +import org.apache.distributedlog.util.Utils; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -136,7 +136,7 @@ public class TestZKAccessControlManager extends ZooKeeperClusterTestCase { verifyStreamPermissions(zkcm, stream2, true, false, true, true, true); // delete stream2 - Await.result(ZKAccessControl.delete(zkc, zkPath2)); + Utils.ioResult(ZKAccessControl.delete(zkc, zkPath2)); logger.info("Delete ACL for stream {}", stream2); while (!zkcm.allowTruncate(stream2)) { Thread.sleep(100); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/admin/TestDLCK.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/admin/TestDLCK.java b/distributedlog-core/src/test/java/org/apache/distributedlog/admin/TestDLCK.java index 4f968b6..8a2c476 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/admin/TestDLCK.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/admin/TestDLCK.java @@ -17,21 +17,20 @@ */ package org.apache.distributedlog.admin; -import org.apache.distributedlog.BookKeeperClient; import org.apache.distributedlog.DLMTestUtil; import org.apache.distributedlog.DLSN; import org.apache.distributedlog.DistributedLogConfiguration; -import org.apache.distributedlog.DistributedLogManager; +import org.apache.distributedlog.api.DistributedLogManager; import org.apache.distributedlog.LogSegmentMetadata; import org.apache.distributedlog.TestDistributedLogBase; import org.apache.distributedlog.TestZooKeeperClientBuilder; import org.apache.distributedlog.ZooKeeperClient; +import org.apache.distributedlog.api.namespace.Namespace; import org.apache.distributedlog.metadata.DryrunLogSegmentMetadataStoreUpdater; import org.apache.distributedlog.metadata.LogSegmentMetadataStoreUpdater; -import org.apache.distributedlog.namespace.DistributedLogNamespace; -import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder; +import org.apache.distributedlog.api.namespace.NamespaceBuilder; import org.apache.distributedlog.util.OrderedScheduler; -import org.apache.distributedlog.util.SchedulerUtils; +import org.apache.distributedlog.common.util.SchedulerUtils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; import org.junit.After; @@ -105,7 +104,7 @@ public class TestDLCK extends TestDistributedLogBase { confLocal.setLogSegmentCacheEnabled(false); URI uri = createDLMURI("/check-and-repair-dl-namespace"); zkc.get().create(uri.getPath(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() + Namespace namespace = NamespaceBuilder.newBuilder() .conf(confLocal) .uri(uri) .build(); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/admin/TestDistributedLogAdmin.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/admin/TestDistributedLogAdmin.java b/distributedlog-core/src/test/java/org/apache/distributedlog/admin/TestDistributedLogAdmin.java index f911f15..f7f859c 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/admin/TestDistributedLogAdmin.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/admin/TestDistributedLogAdmin.java @@ -18,14 +18,14 @@ package org.apache.distributedlog.admin; import java.net.URI; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.CompletableFuture; import org.apache.distributedlog.DistributedLogConfiguration; import org.apache.distributedlog.TestZooKeeperClientBuilder; -import org.apache.distributedlog.annotations.DistributedLogAnnotations; +import org.apache.distributedlog.api.namespace.Namespace; +import org.apache.distributedlog.common.annotations.DistributedLogAnnotations; import org.apache.distributedlog.exceptions.UnexpectedException; -import org.apache.distributedlog.namespace.DistributedLogNamespace; -import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder; +import org.apache.distributedlog.api.namespace.NamespaceBuilder; import org.apache.distributedlog.util.Utils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; @@ -36,19 +36,16 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.distributedlog.AsyncLogReader; +import org.apache.distributedlog.api.AsyncLogReader; import org.apache.distributedlog.DLMTestUtil; import org.apache.distributedlog.DLSN; -import org.apache.distributedlog.DistributedLogManager; +import org.apache.distributedlog.api.DistributedLogManager; import org.apache.distributedlog.LogRecord; import org.apache.distributedlog.LogRecordWithDLSN; import org.apache.distributedlog.TestDistributedLogBase; import org.apache.distributedlog.ZooKeeperClient; import org.apache.distributedlog.metadata.DryrunLogSegmentMetadataStoreUpdater; import org.apache.distributedlog.metadata.LogSegmentMetadataStoreUpdater; -import com.twitter.util.Await; -import com.twitter.util.Duration; -import com.twitter.util.Future; import static org.junit.Assert.*; @@ -92,11 +89,11 @@ public class TestDistributedLogAdmin extends TestDistributedLogBase { URI uri = createDLMURI("/change-sequence-number"); zooKeeperClient.get().create(uri.getPath(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() + Namespace namespace = NamespaceBuilder.newBuilder() .conf(confLocal) .uri(uri) .build(); - DistributedLogNamespace readNamespace = DistributedLogNamespaceBuilder.newBuilder() + Namespace readNamespace = NamespaceBuilder.newBuilder() .conf(readConf) .uri(uri) .build(); @@ -117,7 +114,7 @@ public class TestDistributedLogAdmin extends TestDistributedLogBase { long expectedTxId = 1L; DLSN lastDLSN = DLSN.InitialDLSN; for (int i = 0; i < 4 * 10; i++) { - LogRecordWithDLSN record = Await.result(reader.readNext()); + LogRecordWithDLSN record = Utils.ioResult(reader.readNext()); assertNotNull(record); DLMTestUtil.verifyLogRecord(record); assertEquals(expectedTxId, record.getTransactionId()); @@ -133,9 +130,9 @@ public class TestDistributedLogAdmin extends TestDistributedLogBase { LOG.info("Injected bad log segment '3'"); // there isn't records should be read - Future<LogRecordWithDLSN> readFuture = reader.readNext(); + CompletableFuture<LogRecordWithDLSN> readFuture = reader.readNext(); try { - LogRecordWithDLSN record = Await.result(readFuture); + LogRecordWithDLSN record = Utils.ioResult(readFuture); fail("Should fail reading next record " + record + " when there is a corrupted log segment"); @@ -151,7 +148,7 @@ public class TestDistributedLogAdmin extends TestDistributedLogBase { try { reader = readDLM.getAsyncLogReader(lastDLSN); - Await.result(reader.readNext()); + Utils.ioResult(reader.readNext()); fail("Should fail reading next when there is a corrupted log segment"); } catch (UnexpectedException ue) { // expected @@ -166,18 +163,18 @@ public class TestDistributedLogAdmin extends TestDistributedLogBase { // be able to read more after fix reader = readDLM.getAsyncLogReader(lastDLSN); // skip the first record - Await.result(reader.readNext()); + Utils.ioResult(reader.readNext()); readFuture = reader.readNext(); expectedTxId = 51L; - LogRecord record = Await.result(readFuture); + LogRecord record = Utils.ioResult(readFuture); assertNotNull(record); DLMTestUtil.verifyLogRecord(record); assertEquals(expectedTxId, record.getTransactionId()); expectedTxId++; for (int i = 1; i < 10; i++) { - record = Await.result(reader.readNext()); + record = Utils.ioResult(reader.readNext()); assertNotNull(record); DLMTestUtil.verifyLogRecord(record); assertEquals(expectedTxId, record.getTransactionId()); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java b/distributedlog-core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java index 2492c06..925cad5 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java @@ -17,22 +17,21 @@ */ package org.apache.distributedlog.bk; +import java.util.concurrent.CompletableFuture; import org.apache.distributedlog.BookKeeperClient; import org.apache.distributedlog.BookKeeperClientBuilder; import org.apache.distributedlog.TestZooKeeperClientBuilder; -import org.apache.distributedlog.annotations.DistributedLogAnnotations; +import org.apache.distributedlog.common.annotations.DistributedLogAnnotations; import org.apache.distributedlog.bk.SimpleLedgerAllocator.AllocationException; import org.apache.distributedlog.bk.SimpleLedgerAllocator.Phase; import org.apache.distributedlog.DistributedLogConfiguration; import org.apache.distributedlog.TestDistributedLogBase; import org.apache.distributedlog.ZooKeeperClient; import org.apache.distributedlog.exceptions.ZKException; -import org.apache.distributedlog.util.FutureUtils; import org.apache.distributedlog.util.Transaction.OpListener; import org.apache.distributedlog.util.Utils; import org.apache.distributedlog.zk.DefaultZKOp; import org.apache.distributedlog.zk.ZKTransaction; -import com.twitter.util.Future; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerEntry; @@ -53,7 +52,6 @@ import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.net.URI; import java.util.Enumeration; import java.util.HashSet; @@ -116,13 +114,13 @@ public class TestLedgerAllocator extends TestDistributedLogBase { return new ZKTransaction(zkc); } - private SimpleLedgerAllocator createAllocator(String allocationPath) throws IOException { + private SimpleLedgerAllocator createAllocator(String allocationPath) throws Exception { return createAllocator(allocationPath, dlConf); } private SimpleLedgerAllocator createAllocator(String allocationPath, - DistributedLogConfiguration conf) throws IOException { - return FutureUtils.result(SimpleLedgerAllocator.of(allocationPath, null, newQuorumConfigProvider(conf), zkc, bkc)); + DistributedLogConfiguration conf) throws Exception { + return Utils.ioResult(SimpleLedgerAllocator.of(allocationPath, null, newQuorumConfigProvider(conf), zkc, bkc)); } /** @@ -136,13 +134,13 @@ public class TestLedgerAllocator extends TestDistributedLogBase { SimpleLedgerAllocator allocator = createAllocator(allocationPath); allocator.allocate(); ZKTransaction txn = newTxn(); - LedgerHandle lh = FutureUtils.result(allocator.tryObtain(txn, NULL_LISTENER)); + LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER)); logger.info("Try obtaining ledger handle {}", lh.getId()); byte[] data = zkc.get().getData(allocationPath, false, null); assertEquals((Long) lh.getId(), Long.valueOf(new String(data, UTF_8))); txn.addOp(DefaultZKOp.of(Op.setData("/unexistedpath", "data".getBytes(UTF_8), -1), null)); try { - FutureUtils.result(txn.execute()); + Utils.ioResult(txn.execute()); fail("Should fail the transaction when setting unexisted path"); } catch (ZKException ke) { // expected @@ -154,9 +152,9 @@ public class TestLedgerAllocator extends TestDistributedLogBase { // Create new transaction to obtain the ledger again. txn = newTxn(); // we could obtain the ledger if it was obtained - LedgerHandle newLh = FutureUtils.result(allocator.tryObtain(txn, NULL_LISTENER)); + LedgerHandle newLh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER)); assertEquals(lh.getId(), newLh.getId()); - FutureUtils.result(txn.execute()); + Utils.ioResult(txn.execute()); data = zkc.get().getData(allocationPath, false, null); assertEquals(0, data.length); Utils.close(allocator); @@ -177,16 +175,16 @@ public class TestLedgerAllocator extends TestDistributedLogBase { allocator1.allocate(); // wait until allocated ZKTransaction txn1 = newTxn(); - LedgerHandle lh = FutureUtils.result(allocator1.tryObtain(txn1, NULL_LISTENER)); + LedgerHandle lh = Utils.ioResult(allocator1.tryObtain(txn1, NULL_LISTENER)); allocator2.allocate(); ZKTransaction txn2 = newTxn(); try { - FutureUtils.result(allocator2.tryObtain(txn2, NULL_LISTENER)); + Utils.ioResult(allocator2.tryObtain(txn2, NULL_LISTENER)); fail("Should fail allocating on second allocator as allocator1 is starting allocating something."); - } catch (ZKException zke) { - assertEquals(KeeperException.Code.BADVERSION, zke.getKeeperExceptionCode()); + } catch (ZKException ke) { + assertEquals(KeeperException.Code.BADVERSION, ke.getKeeperExceptionCode()); } - FutureUtils.result(txn1.execute()); + Utils.ioResult(txn1.execute()); Utils.close(allocator1); Utils.close(allocator2); @@ -217,7 +215,7 @@ public class TestLedgerAllocator extends TestDistributedLogBase { ZKTransaction txn1 = newTxn(); try { - FutureUtils.result(allocator1.tryObtain(txn1, NULL_LISTENER)); + Utils.ioResult(allocator1.tryObtain(txn1, NULL_LISTENER)); fail("Should fail allocating ledger if there aren't enough bookies"); } catch (AllocationException ioe) { // expected @@ -241,7 +239,7 @@ public class TestLedgerAllocator extends TestDistributedLogBase { allocator1.allocate(); // wait until allocated ZKTransaction txn1 = newTxn(); - LedgerHandle lh1 = FutureUtils.result(allocator1.tryObtain(txn1, NULL_LISTENER)); + LedgerHandle lh1 = Utils.ioResult(allocator1.tryObtain(txn1, NULL_LISTENER)); // Second allocator kicks in stat = new Stat(); @@ -252,16 +250,16 @@ public class TestLedgerAllocator extends TestDistributedLogBase { allocator2.allocate(); // wait until allocated ZKTransaction txn2 = newTxn(); - LedgerHandle lh2 = FutureUtils.result(allocator2.tryObtain(txn2, NULL_LISTENER)); + LedgerHandle lh2 = Utils.ioResult(allocator2.tryObtain(txn2, NULL_LISTENER)); // should fail to commit txn1 as version is changed by second allocator try { - FutureUtils.result(txn1.execute()); + Utils.ioResult(txn1.execute()); fail("Should fail commit obtaining ledger handle from first allocator as allocator is modified by second allocator."); } catch (ZKException ke) { // as expected } - FutureUtils.result(txn2.execute()); + Utils.ioResult(txn2.execute()); Utils.close(allocator1); Utils.close(allocator2); @@ -298,7 +296,7 @@ public class TestLedgerAllocator extends TestDistributedLogBase { allocator.allocate(); ZKTransaction txn = newTxn(); // close during obtaining ledger. - LedgerHandle lh = FutureUtils.result(allocator.tryObtain(txn, NULL_LISTENER)); + LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER)); Utils.close(allocator); byte[] data = zkc.get().getData(allocationPath, false, null); assertEquals((Long) lh.getId(), Long.valueOf(new String(data, UTF_8))); @@ -319,8 +317,8 @@ public class TestLedgerAllocator extends TestDistributedLogBase { allocator.allocate(); ZKTransaction txn = newTxn(); // close during obtaining ledger. - LedgerHandle lh = FutureUtils.result(allocator.tryObtain(txn, NULL_LISTENER)); - FutureUtils.result(txn.execute()); + LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER)); + Utils.ioResult(txn.execute()); Utils.close(allocator); byte[] data = zkc.get().getData(allocationPath, false, null); assertEquals(0, data.length); @@ -336,10 +334,10 @@ public class TestLedgerAllocator extends TestDistributedLogBase { allocator.allocate(); ZKTransaction txn = newTxn(); // close during obtaining ledger. - LedgerHandle lh = FutureUtils.result(allocator.tryObtain(txn, NULL_LISTENER)); + LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER)); txn.addOp(DefaultZKOp.of(Op.setData("/unexistedpath", "data".getBytes(UTF_8), -1), null)); try { - FutureUtils.result(txn.execute()); + Utils.ioResult(txn.execute()); fail("Should fail the transaction when setting unexisted path"); } catch (ZKException ke) { // expected @@ -358,13 +356,13 @@ public class TestLedgerAllocator extends TestDistributedLogBase { SimpleLedgerAllocator allocator = createAllocator(allcationPath); allocator.allocate(); ZKTransaction txn1 = newTxn(); - Future<LedgerHandle> obtainFuture1 = allocator.tryObtain(txn1, NULL_LISTENER); + CompletableFuture<LedgerHandle> obtainFuture1 = allocator.tryObtain(txn1, NULL_LISTENER); ZKTransaction txn2 = newTxn(); - Future<LedgerHandle> obtainFuture2 = allocator.tryObtain(txn2, NULL_LISTENER); - assertTrue(obtainFuture2.isDefined()); - assertTrue(obtainFuture2.isThrow()); + CompletableFuture<LedgerHandle> obtainFuture2 = allocator.tryObtain(txn2, NULL_LISTENER); + assertTrue(obtainFuture2.isDone()); + assertTrue(obtainFuture2.isCompletedExceptionally()); try { - FutureUtils.result(obtainFuture2); + Utils.ioResult(obtainFuture2); fail("Should fail the concurrent obtain since there is already a transaction obtaining the ledger handle"); } catch (SimpleLedgerAllocator.ConcurrentObtainException cbe) { // expected @@ -380,8 +378,8 @@ public class TestLedgerAllocator extends TestDistributedLogBase { for (int i = 0; i < numLedgers; i++) { allocator.allocate(); ZKTransaction txn = newTxn(); - LedgerHandle lh = FutureUtils.result(allocator.tryObtain(txn, NULL_LISTENER)); - FutureUtils.result(txn.execute()); + LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER)); + Utils.ioResult(txn.execute()); allocatedLedgers.add(lh); } assertEquals(numLedgers, allocatedLedgers.size()); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocatorPool.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocatorPool.java b/distributedlog-core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocatorPool.java index e1aaa0b..a42d688 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocatorPool.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocatorPool.java @@ -24,7 +24,6 @@ import org.apache.distributedlog.DistributedLogConfiguration; import org.apache.distributedlog.TestDistributedLogBase; import org.apache.distributedlog.TestZooKeeperClientBuilder; import org.apache.distributedlog.ZooKeeperClient; -import org.apache.distributedlog.util.FutureUtils; import org.apache.distributedlog.util.Transaction.OpListener; import org.apache.distributedlog.util.Utils; import org.apache.distributedlog.zk.ZKTransaction; @@ -127,7 +126,7 @@ public class TestLedgerAllocatorPool extends TestDistributedLogBase { for (int i = 0; i < numAllocators; i++) { try { pool.allocate(); - FutureUtils.result(pool.tryObtain(newTxn(), NULL_LISTENER)); + Utils.ioResult(pool.tryObtain(newTxn(), NULL_LISTENER)); fail("Should fail to allocate ledger if there are enought bookies"); } catch (SimpleLedgerAllocator.AllocationException ae) { assertEquals(SimpleLedgerAllocator.Phase.ERROR, ae.getPhase()); @@ -136,7 +135,7 @@ public class TestLedgerAllocatorPool extends TestDistributedLogBase { for (int i = 0; i < numAllocators; i++) { try { pool.allocate(); - FutureUtils.result(pool.tryObtain(newTxn(), NULL_LISTENER)); + Utils.ioResult(pool.tryObtain(newTxn(), NULL_LISTENER)); fail("Should fail to allocate ledger if there aren't available allocators"); } catch (SimpleLedgerAllocator.AllocationException ae) { assertEquals(SimpleLedgerAllocator.Phase.ERROR, ae.getPhase()); @@ -159,7 +158,7 @@ public class TestLedgerAllocatorPool extends TestDistributedLogBase { for (int i = 0; i < numAllocators; i++) { ZKTransaction txn = newTxn(); pool.allocate(); - LedgerHandle lh = FutureUtils.result(pool.tryObtain(txn, NULL_LISTENER)); + LedgerHandle lh = Utils.ioResult(pool.tryObtain(txn, NULL_LISTENER)); // get the corresponding ledger allocator SimpleLedgerAllocator sla = pool.getLedgerAllocator(lh); @@ -176,7 +175,7 @@ public class TestLedgerAllocatorPool extends TestDistributedLogBase { String slaPath = allocatePaths.get(i); // execute the transaction to confirm/abort obtain - FutureUtils.result(txn.execute()); + Utils.ioResult(txn.execute()); // introduce error to individual ledger allocator byte[] data = zkc.get().getData(slaPath, false, new Stat()); @@ -188,7 +187,7 @@ public class TestLedgerAllocatorPool extends TestDistributedLogBase { try { pool.allocate(); ZKTransaction txn = newTxn(); - LedgerHandle lh = FutureUtils.result(pool.tryObtain(txn, NULL_LISTENER)); + LedgerHandle lh = Utils.ioResult(pool.tryObtain(txn, NULL_LISTENER)); // get the corresponding ledger allocator SimpleLedgerAllocator sla = pool.getLedgerAllocator(lh); @@ -197,7 +196,7 @@ public class TestLedgerAllocatorPool extends TestDistributedLogBase { logger.info("Allocated ledger {} from path {}", lh.getId(), slaPath); allocatedPathSet.add(slaPath); - FutureUtils.result(txn.execute()); + Utils.ioResult(txn.execute()); ++numSuccess; } catch (IOException ioe) { // continue @@ -229,7 +228,7 @@ public class TestLedgerAllocatorPool extends TestDistributedLogBase { LedgerAllocatorPool pool = new LedgerAllocatorPool(allocationPath, 0, dlConf, zkc, bkc, allocationExecutor); ZKTransaction txn = newTxn(); try { - FutureUtils.result(pool.tryObtain(txn, NULL_LISTENER)); + Utils.ioResult(pool.tryObtain(txn, NULL_LISTENER)); fail("Should fail obtain ledger handle if there is no allocator."); } catch (SimpleLedgerAllocator.AllocationException ae) { fail("Should fail obtain ledger handle if there is no allocator."); @@ -251,8 +250,8 @@ public class TestLedgerAllocatorPool extends TestDistributedLogBase { for (int i = 0; i < numLedgers; i++) { pool.allocate(); ZKTransaction txn = newTxn(); - LedgerHandle lh = FutureUtils.result(pool.tryObtain(txn, NULL_LISTENER)); - FutureUtils.result(txn.execute()); + LedgerHandle lh = Utils.ioResult(pool.tryObtain(txn, NULL_LISTENER)); + Utils.ioResult(txn.execute()); allocatedLedgers.add(lh); } assertEquals(numLedgers, allocatedLedgers.size()); @@ -280,8 +279,8 @@ public class TestLedgerAllocatorPool extends TestDistributedLogBase { for (int i = 0; i < numLedgers; i++) { pool.allocate(); ZKTransaction txn = newTxn(); - LedgerHandle lh = FutureUtils.result(pool.tryObtain(txn, NULL_LISTENER)); - FutureUtils.result(txn.execute()); + LedgerHandle lh = Utils.ioResult(pool.tryObtain(txn, NULL_LISTENER)); + Utils.ioResult(txn.execute()); lh.close(); allocatedLedgers.putIfAbsent(lh.getId(), lh); logger.info("[thread {}] allocate {}th ledger {}", http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/config/PropertiesWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/config/PropertiesWriter.java b/distributedlog-core/src/test/java/org/apache/distributedlog/config/PropertiesWriter.java index f371007..5efa7e4 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/config/PropertiesWriter.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/config/PropertiesWriter.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,10 +20,12 @@ package org.apache.distributedlog.config; import java.io.File; import java.io.FileOutputStream; import java.util.Properties; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Writer to write properties to files. + */ public class PropertiesWriter { static final Logger LOG = LoggerFactory.getLogger(PropertiesWriter.class); @@ -57,7 +59,7 @@ public class PropertiesWriter { public void save() throws Exception { FileOutputStream outputStream = new FileOutputStream(configFile); properties.store(outputStream, null); - configFile.setLastModified(configFile.lastModified()+1000); + configFile.setLastModified(configFile.lastModified() + 1000); LOG.debug("save modified={}", configFile.lastModified()); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestConcurrentBaseConfiguration.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestConcurrentBaseConfiguration.java b/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestConcurrentBaseConfiguration.java deleted file mode 100644 index 9563511..0000000 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestConcurrentBaseConfiguration.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.config; - -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.junit.Assert.*; - -public class TestConcurrentBaseConfiguration { - static final Logger LOG = LoggerFactory.getLogger(TestConcurrentBaseConfiguration.class); - - @Test(timeout = 20000) - public void testBasicOperations() throws Exception { - ConcurrentBaseConfiguration conf = new ConcurrentBaseConfiguration(); - conf.setProperty("prop1", "1"); - assertEquals(1, conf.getInt("prop1")); - conf.setProperty("prop1", "2"); - assertEquals(2, conf.getInt("prop1")); - conf.clearProperty("prop1"); - assertEquals(null, conf.getInteger("prop1", null)); - conf.setProperty("prop1", "1"); - conf.setProperty("prop2", "2"); - assertEquals(1, conf.getInt("prop1")); - assertEquals(2, conf.getInt("prop2")); - conf.clearProperty("prop1"); - assertEquals(null, conf.getInteger("prop1", null)); - assertEquals(2, conf.getInt("prop2")); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestConfigurationSubscription.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestConfigurationSubscription.java b/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestConfigurationSubscription.java deleted file mode 100644 index 8420a97..0000000 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestConfigurationSubscription.java +++ /dev/null @@ -1,171 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.config; - -import com.google.common.collect.Lists; -import org.apache.distributedlog.DistributedLogConfiguration; -import org.apache.commons.configuration.event.ConfigurationEvent; -import org.apache.commons.configuration.event.ConfigurationListener; -import org.jmock.lib.concurrent.DeterministicScheduler; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -import static org.junit.Assert.*; - -/** - * Notes: - * 1. lastModified granularity is platform dependent, generally 1 sec, so we can't wait 1ms for things to - * get picked up. - */ -public class TestConfigurationSubscription { - static final Logger LOG = LoggerFactory.getLogger(TestConfigurationSubscription.class); - - /** - * Give FileChangedReloadingStrategy some time to start reloading - * Make sure now!=lastChecked - * {@link org.apache.commons.configuration.reloading.FileChangedReloadingStrategy#reloadingRequired()} - */ - private void ensureConfigReloaded() throws InterruptedException { - // sleep 1 ms so that System.currentTimeMillis() != - // lastChecked (the time we construct FileChangedReloadingStrategy - Thread.sleep(1); - } - - @Test(timeout = 60000) - public void testReloadConfiguration() throws Exception { - PropertiesWriter writer = new PropertiesWriter(); - FileConfigurationBuilder builder = new PropertiesConfigurationBuilder(writer.getFile().toURI().toURL()); - ConcurrentConstConfiguration conf = new ConcurrentConstConfiguration(new DistributedLogConfiguration()); - DeterministicScheduler executorService = new DeterministicScheduler(); - List<FileConfigurationBuilder> fileConfigBuilders = Lists.newArrayList(builder); - ConfigurationSubscription confSub = - new ConfigurationSubscription(conf, fileConfigBuilders, executorService, 100, TimeUnit.MILLISECONDS); - final AtomicReference<ConcurrentBaseConfiguration> confHolder = new AtomicReference<>(); - confSub.registerListener(new org.apache.distributedlog.config.ConfigurationListener() { - @Override - public void onReload(ConcurrentBaseConfiguration conf) { - confHolder.set(conf); - } - }); - assertEquals(null, conf.getProperty("prop1")); - - // add - writer.setProperty("prop1", "1"); - writer.save(); - // ensure the file change reloading event can be triggered - ensureConfigReloaded(); - // reload the config - confSub.reload(); - assertNotNull(confHolder.get()); - assertTrue(conf == confHolder.get()); - assertEquals("1", conf.getProperty("prop1")); - } - - @Test(timeout = 60000) - public void testAddReloadBasicsConfig() throws Exception { - PropertiesWriter writer = new PropertiesWriter(); - DeterministicScheduler mockScheduler = new DeterministicScheduler(); - FileConfigurationBuilder builder = new PropertiesConfigurationBuilder(writer.getFile().toURI().toURL()); - ConcurrentConstConfiguration conf = new ConcurrentConstConfiguration(new DistributedLogConfiguration()); - List<FileConfigurationBuilder> fileConfigBuilders = Lists.newArrayList(builder); - ConfigurationSubscription confSub = - new ConfigurationSubscription(conf, fileConfigBuilders, mockScheduler, 100, TimeUnit.MILLISECONDS); - assertEquals(null, conf.getProperty("prop1")); - - // add - writer.setProperty("prop1", "1"); - writer.save(); - // ensure the file change reloading event can be triggered - ensureConfigReloaded(); - mockScheduler.tick(100, TimeUnit.MILLISECONDS); - assertEquals("1", conf.getProperty("prop1")); - - } - - @Test(timeout = 60000) - public void testInitialConfigLoad() throws Exception { - PropertiesWriter writer = new PropertiesWriter(); - writer.setProperty("prop1", "1"); - writer.setProperty("prop2", "abc"); - writer.setProperty("prop3", "123.0"); - writer.setProperty("prop4", "11132"); - writer.setProperty("prop5", "true"); - writer.save(); - - ScheduledExecutorService mockScheduler = new DeterministicScheduler(); - FileConfigurationBuilder builder = new PropertiesConfigurationBuilder(writer.getFile().toURI().toURL()); - ConcurrentConstConfiguration conf = new ConcurrentConstConfiguration(new DistributedLogConfiguration()); - List<FileConfigurationBuilder> fileConfigBuilders = Lists.newArrayList(builder); - ConfigurationSubscription confSub = - new ConfigurationSubscription(conf, fileConfigBuilders, mockScheduler, 100, TimeUnit.MILLISECONDS); - assertEquals(1, conf.getInt("prop1")); - assertEquals("abc", conf.getString("prop2")); - assertEquals(123.0, conf.getFloat("prop3"), 0); - assertEquals(11132, conf.getInt("prop4")); - assertEquals(true, conf.getBoolean("prop5")); - } - - @Test(timeout = 60000) - public void testExceptionInConfigLoad() throws Exception { - PropertiesWriter writer = new PropertiesWriter(); - writer.setProperty("prop1", "1"); - writer.save(); - - DeterministicScheduler mockScheduler = new DeterministicScheduler(); - FileConfigurationBuilder builder = new PropertiesConfigurationBuilder(writer.getFile().toURI().toURL()); - ConcurrentConstConfiguration conf = new ConcurrentConstConfiguration(new DistributedLogConfiguration()); - List<FileConfigurationBuilder> fileConfigBuilders = Lists.newArrayList(builder); - ConfigurationSubscription confSub = - new ConfigurationSubscription(conf, fileConfigBuilders, mockScheduler, 100, TimeUnit.MILLISECONDS); - - final AtomicInteger count = new AtomicInteger(1); - conf.addConfigurationListener(new ConfigurationListener() { - @Override - public void configurationChanged(ConfigurationEvent event) { - LOG.info("config changed {}", event); - // Throw after so we actually see the update anyway. - if (!event.isBeforeUpdate()) { - count.getAndIncrement(); - throw new RuntimeException("config listener threw and exception"); - } - } - }); - - int i = 0; - int initial = 0; - while (count.get() == initial) { - writer.setProperty("prop1", Integer.toString(i++)); - writer.save(); - mockScheduler.tick(100, TimeUnit.MILLISECONDS); - } - - initial = count.get(); - while (count.get() == initial) { - writer.setProperty("prop1", Integer.toString(i++)); - writer.save(); - mockScheduler.tick(100, TimeUnit.MILLISECONDS); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestDynamicConfigurationFactory.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestDynamicConfigurationFactory.java b/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestDynamicConfigurationFactory.java index b5d6300..21aa1c9 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestDynamicConfigurationFactory.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestDynamicConfigurationFactory.java @@ -23,12 +23,13 @@ import com.google.common.base.Optional; import org.apache.distributedlog.DistributedLogConfiguration; import java.io.File; -import java.io.FileNotFoundException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.apache.commons.configuration.ConfigurationException; +import org.apache.distributedlog.common.config.ConcurrentBaseConfiguration; +import org.apache.distributedlog.common.config.ConcurrentConstConfiguration; +import org.apache.distributedlog.common.config.PropertiesWriter; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestDynamicDistributedLogConfiguration.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestDynamicDistributedLogConfiguration.java b/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestDynamicDistributedLogConfiguration.java index c1ac98a..2731af3 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestDynamicDistributedLogConfiguration.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestDynamicDistributedLogConfiguration.java @@ -20,6 +20,8 @@ package org.apache.distributedlog.config; import org.apache.distributedlog.DistributedLogConfiguration; import org.apache.distributedlog.bk.QuorumConfig; +import org.apache.distributedlog.common.config.ConcurrentBaseConfiguration; +import org.apache.distributedlog.common.config.ConcurrentConstConfiguration; import org.junit.Test; import static org.apache.distributedlog.DistributedLogConfiguration.*; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/feature/TestConfigurationFeatureProvider.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/feature/TestConfigurationFeatureProvider.java b/distributedlog-core/src/test/java/org/apache/distributedlog/feature/TestConfigurationFeatureProvider.java index 3ce4952..1064a6f 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/feature/TestConfigurationFeatureProvider.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/feature/TestConfigurationFeatureProvider.java @@ -17,7 +17,7 @@ */ package org.apache.distributedlog.feature; -import org.apache.distributedlog.config.ConcurrentBaseConfiguration; +import org.apache.distributedlog.common.config.ConcurrentBaseConfiguration; import org.apache.bookkeeper.feature.Feature; import org.apache.bookkeeper.feature.SettableFeature; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java b/distributedlog-core/src/test/java/org/apache/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java index 5d4472d..f8dd245 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java @@ -18,8 +18,8 @@ package org.apache.distributedlog.feature; import org.apache.distributedlog.DistributedLogConfiguration; -import org.apache.distributedlog.annotations.DistributedLogAnnotations; -import org.apache.distributedlog.config.PropertiesWriter; +import org.apache.distributedlog.common.annotations.DistributedLogAnnotations; +import org.apache.distributedlog.common.config.PropertiesWriter; import org.apache.bookkeeper.feature.Feature; import org.apache.bookkeeper.stats.NullStatsLogger; import org.junit.Ignore; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKLogMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKLogMetadataStore.java b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKLogMetadataStore.java index b2fcbf6..db9fb31 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKLogMetadataStore.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKLogMetadataStore.java @@ -23,7 +23,6 @@ import org.apache.distributedlog.DistributedLogConfiguration; import org.apache.distributedlog.TestDistributedLogBase; import org.apache.distributedlog.TestZooKeeperClientBuilder; import org.apache.distributedlog.ZooKeeperClient; -import org.apache.distributedlog.util.FutureUtils; import org.apache.distributedlog.util.OrderedScheduler; import org.apache.distributedlog.util.Utils; import org.apache.zookeeper.CreateMode; @@ -89,12 +88,12 @@ public class TestZKLogMetadataStore extends TestDistributedLogBase { @Test(timeout = 60000) public void testCreateLog() throws Exception { - assertEquals(uri, FutureUtils.result(metadataStore.createLog("test"))); + assertEquals(uri, Utils.ioResult(metadataStore.createLog("test"))); } @Test(timeout = 60000) public void testGetLogLocation() throws Exception { - Optional<URI> uriOptional = FutureUtils.result(metadataStore.getLogLocation("test")); + Optional<URI> uriOptional = Utils.ioResult(metadataStore.getLogLocation("test")); assertTrue(uriOptional.isPresent()); assertEquals(uri, uriOptional.get()); } @@ -107,7 +106,7 @@ public class TestZKLogMetadataStore extends TestDistributedLogBase { logs.add(logName); createLogInNamespace(uri, logName); } - Set<String> result = Sets.newHashSet(FutureUtils.result(metadataStore.getLogs())); + Set<String> result = Sets.newHashSet(Utils.ioResult(metadataStore.getLogs())); assertEquals(10, result.size()); assertTrue(Sets.difference(logs, result).isEmpty()); }