start writing RPC tests
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/bee07a52 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/bee07a52 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/bee07a52 Branch: refs/heads/master Commit: bee07a525682436ce33e7110b8f6688ec1df285f Parents: f3ec63c Author: randgalt <randg...@apache.org> Authored: Sun Jun 1 16:11:02 2014 -0500 Committer: randgalt <randg...@apache.org> Committed: Sun Jun 1 16:11:02 2014 -0500 ---------------------------------------------------------------------- .../curator/x/rpc/CuratorProjectionServer.java | 14 +- .../java/org/apache/curator/x/rpc/RpcTests.java | 207 +++++++++++++++++++ 2 files changed, 217 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/bee07a52/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java index 806f06e..a01f462 100644 --- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java @@ -77,10 +77,7 @@ public class CuratorProjectionServer configurationSource = args[0]; } - Configuration configuration = new ConfigurationBuilder(configurationSource).build(); - - final CuratorProjectionServer server = new CuratorProjectionServer(configuration); - server.start(); + final CuratorProjectionServer server = startServer(configurationSource); Runnable shutdown = new Runnable() { @@ -94,6 +91,15 @@ public class CuratorProjectionServer Runtime.getRuntime().addShutdownHook(hook); } + public static CuratorProjectionServer startServer(String configurationSource) throws Exception + { + Configuration configuration = new ConfigurationBuilder(configurationSource).build(); + + final CuratorProjectionServer server = new CuratorProjectionServer(configuration); + server.start(); + return server; + } + public CuratorProjectionServer(Configuration configuration) { this.configuration = configuration; http://git-wip-us.apache.org/repos/asf/curator/blob/bee07a52/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/RpcTests.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/RpcTests.java b/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/RpcTests.java new file mode 100644 index 0000000..5a83b25 --- /dev/null +++ b/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/RpcTests.java @@ -0,0 +1,207 @@ +package org.apache.curator.x.rpc; + +import org.apache.curator.generated.*; +import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.InstanceSpec; +import org.apache.curator.test.Timing; +import org.apache.curator.utils.ThreadUtils; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TSocket; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.node.ArrayNode; +import org.codehaus.jackson.node.ObjectNode; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import java.nio.ByteBuffer; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; + +public class RpcTests extends BaseClassForTests +{ + private Timing timing = new Timing(); + private CuratorProjectionServer thriftServer; + private CuratorService.Client curatorServiceClient; + private EventService.Client eventServiceClient; + private int thriftPort; + + @BeforeMethod + @Override + public void setup() throws Exception + { + super.setup(); + + ObjectMapper mapper = new ObjectMapper(); + + ObjectNode connectionNode = mapper.createObjectNode(); + connectionNode.put("name", "test"); + connectionNode.put("connectionString", server.getConnectString()); + + ObjectNode thriftNode = mapper.createObjectNode(); + thriftPort = InstanceSpec.getRandomPort(); + thriftNode.put("port", thriftPort); + + ArrayNode connections = mapper.createArrayNode(); + connections.add(connectionNode); + + ObjectNode node = mapper.createObjectNode(); + node.put("connections", connections); + node.put("thrift", thriftNode); + + final String configurationJson = mapper.writeValueAsString(node); + + thriftServer = CuratorProjectionServer.startServer(configurationJson); + + TSocket clientTransport = new TSocket("localhost", thriftPort); + clientTransport.setTimeout(timing.connection()); + clientTransport.open(); + TProtocol clientProtocol = new TBinaryProtocol(clientTransport); + curatorServiceClient = new CuratorService.Client(clientProtocol); + + TSocket eventTransport = new TSocket("localhost", thriftPort); + eventTransport.setTimeout(timing.connection()); + eventTransport.open(); + TProtocol eventProtocol = new TBinaryProtocol(eventTransport); + eventServiceClient = new EventService.Client(eventProtocol); + + } + + @AfterMethod + @Override + public void teardown() throws Exception + { + thriftServer.stop(); + + super.teardown(); + } + + @Test + public void testBasic() throws Exception + { + CuratorProjection curatorProjection = curatorServiceClient.newCuratorProjection("test"); + CreateSpec spec = new CreateSpec(); + spec.path = "/test"; + spec.data = ByteBuffer.wrap("value".getBytes()); + OptionalPath node = curatorServiceClient.createNode(curatorProjection, spec); + Assert.assertEquals(node.path, "/test"); + + GetDataSpec dataSpec = new GetDataSpec(); + dataSpec.path = "/test"; + OptionalData data = curatorServiceClient.getData(curatorProjection, dataSpec); + Assert.assertEquals(data.data, ByteBuffer.wrap("value".getBytes())); + } + + @Test + public void testEvents() throws Exception + { + final CuratorProjection curatorProjection = curatorServiceClient.newCuratorProjection("test"); + + final CountDownLatch connectedLatch = new CountDownLatch(1); + final CountDownLatch nodeCreatedLatch = new CountDownLatch(1); + Callable<Void> proc = new Callable<Void>() + { + @Override + public Void call() throws Exception + { + while ( !Thread.currentThread().isInterrupted() ) + { + CuratorEvent event = eventServiceClient.getNextEvent(curatorProjection); + if ( event.type == CuratorEventType.CONNECTION_CONNECTED ) + { + connectedLatch.countDown(); + } + else if ( event.type == CuratorEventType.WATCHED ) + { + if ( event.watchedEvent.eventType == EventType.NodeCreated ) + { + nodeCreatedLatch.countDown(); + } + } + } + return null; + } + }; + Future<Void> eventFuture = ThreadUtils.newSingleThreadExecutor("test").submit(proc); + + Assert.assertTrue(timing.awaitLatch(connectedLatch)); + + ExistsSpec spec = new ExistsSpec(); + spec.path = "/test"; + spec.watched = true; + curatorServiceClient.exists(curatorProjection, spec); + + CreateSpec createSpec = new CreateSpec(); + createSpec.path = "/test"; + curatorServiceClient.createNode(curatorProjection, createSpec); + + Assert.assertTrue(timing.awaitLatch(nodeCreatedLatch)); + + eventFuture.cancel(true); + } + + @Test + public void testLockMultiThread() throws Exception + { + final Timing timing = new Timing(); + + TSocket clientTransport = new TSocket("localhost", thriftPort); + clientTransport.setTimeout(timing.connection()); + clientTransport.open(); + TProtocol clientProtocol = new TBinaryProtocol(clientTransport); + final CuratorService.Client secondCuratorServiceClient = new CuratorService.Client(clientProtocol); + ExecutorService service = ThreadUtils.newFixedThreadPool(2, "test"); + ExecutorCompletionService<Void> completer = new ExecutorCompletionService<Void>(service); + + final CountDownLatch lockLatch = new CountDownLatch(2); + final AtomicBoolean hasTheLock = new AtomicBoolean(); + for ( int i = 0; i < 2; ++i ) + { + final CuratorService.Client client = (i == 0) ? curatorServiceClient : secondCuratorServiceClient; + Callable<Void> proc = new Callable<Void>() + { + @Override + public Void call() throws Exception + { + CuratorProjection curatorProjection = client.newCuratorProjection("test"); + LockProjection lockProjection = client.acquireLock(curatorProjection, "/lock", timing.forWaiting().milliseconds()); + if ( lockProjection.id == null ) + { + throw new Exception("Could not acquire lock"); + } + try + { + if ( !hasTheLock.compareAndSet(false, true) ) + { + throw new Exception("Two lockers"); + } + + timing.sleepABit(); + } + finally + { + hasTheLock.set(false); + lockLatch.countDown(); + client.closeGenericProjection(curatorProjection, lockProjection.id); + } + + return null; + } + }; + completer.submit(proc); + } + + completer.take().get(); + completer.take().get(); + + Assert.assertTrue(timing.awaitLatch(lockLatch)); + + service.shutdownNow(); + } +} +