start writing RPC tests

Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/bee07a52
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/bee07a52
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/bee07a52

Branch: refs/heads/master
Commit: bee07a525682436ce33e7110b8f6688ec1df285f
Parents: f3ec63c
Author: randgalt <randg...@apache.org>
Authored: Sun Jun 1 16:11:02 2014 -0500
Committer: randgalt <randg...@apache.org>
Committed: Sun Jun 1 16:11:02 2014 -0500

----------------------------------------------------------------------
 .../curator/x/rpc/CuratorProjectionServer.java  |  14 +-
 .../java/org/apache/curator/x/rpc/RpcTests.java | 207 +++++++++++++++++++
 2 files changed, 217 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/bee07a52/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
----------------------------------------------------------------------
diff --git 
a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
 
b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
index 806f06e..a01f462 100644
--- 
a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
+++ 
b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
@@ -77,10 +77,7 @@ public class CuratorProjectionServer
             configurationSource = args[0];
         }
 
-        Configuration configuration = new 
ConfigurationBuilder(configurationSource).build();
-
-        final CuratorProjectionServer server = new 
CuratorProjectionServer(configuration);
-        server.start();
+        final CuratorProjectionServer server = 
startServer(configurationSource);
 
         Runnable shutdown = new Runnable()
         {
@@ -94,6 +91,15 @@ public class CuratorProjectionServer
         Runtime.getRuntime().addShutdownHook(hook);
     }
 
+    public static CuratorProjectionServer startServer(String 
configurationSource) throws Exception
+    {
+        Configuration configuration = new 
ConfigurationBuilder(configurationSource).build();
+
+        final CuratorProjectionServer server = new 
CuratorProjectionServer(configuration);
+        server.start();
+        return server;
+    }
+
     public CuratorProjectionServer(Configuration configuration)
     {
         this.configuration = configuration;

http://git-wip-us.apache.org/repos/asf/curator/blob/bee07a52/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/RpcTests.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/RpcTests.java 
b/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/RpcTests.java
new file mode 100644
index 0000000..5a83b25
--- /dev/null
+++ b/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/RpcTests.java
@@ -0,0 +1,207 @@
+package org.apache.curator.x.rpc;
+
+import org.apache.curator.generated.*;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.ThreadUtils;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ArrayNode;
+import org.codehaus.jackson.node.ObjectNode;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class RpcTests extends BaseClassForTests
+{
+    private Timing timing = new Timing();
+    private CuratorProjectionServer thriftServer;
+    private CuratorService.Client curatorServiceClient;
+    private EventService.Client eventServiceClient;
+    private int thriftPort;
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception
+    {
+        super.setup();
+
+        ObjectMapper mapper = new ObjectMapper();
+
+        ObjectNode connectionNode = mapper.createObjectNode();
+        connectionNode.put("name", "test");
+        connectionNode.put("connectionString", server.getConnectString());
+
+        ObjectNode thriftNode = mapper.createObjectNode();
+        thriftPort = InstanceSpec.getRandomPort();
+        thriftNode.put("port", thriftPort);
+
+        ArrayNode connections = mapper.createArrayNode();
+        connections.add(connectionNode);
+
+        ObjectNode node = mapper.createObjectNode();
+        node.put("connections", connections);
+        node.put("thrift", thriftNode);
+
+        final String configurationJson = mapper.writeValueAsString(node);
+
+        thriftServer = CuratorProjectionServer.startServer(configurationJson);
+
+        TSocket clientTransport = new TSocket("localhost", thriftPort);
+        clientTransport.setTimeout(timing.connection());
+        clientTransport.open();
+        TProtocol clientProtocol = new TBinaryProtocol(clientTransport);
+        curatorServiceClient = new CuratorService.Client(clientProtocol);
+
+        TSocket eventTransport = new TSocket("localhost", thriftPort);
+        eventTransport.setTimeout(timing.connection());
+        eventTransport.open();
+        TProtocol eventProtocol = new TBinaryProtocol(eventTransport);
+        eventServiceClient = new EventService.Client(eventProtocol);
+
+    }
+
+    @AfterMethod
+    @Override
+    public void teardown() throws Exception
+    {
+        thriftServer.stop();
+
+        super.teardown();
+    }
+
+    @Test
+    public void testBasic() throws Exception
+    {
+        CuratorProjection curatorProjection = 
curatorServiceClient.newCuratorProjection("test");
+        CreateSpec spec = new CreateSpec();
+        spec.path = "/test";
+        spec.data = ByteBuffer.wrap("value".getBytes());
+        OptionalPath node = curatorServiceClient.createNode(curatorProjection, 
spec);
+        Assert.assertEquals(node.path, "/test");
+
+        GetDataSpec dataSpec = new GetDataSpec();
+        dataSpec.path = "/test";
+        OptionalData data = curatorServiceClient.getData(curatorProjection, 
dataSpec);
+        Assert.assertEquals(data.data, ByteBuffer.wrap("value".getBytes()));
+    }
+
+    @Test
+    public void testEvents() throws Exception
+    {
+        final CuratorProjection curatorProjection = 
curatorServiceClient.newCuratorProjection("test");
+
+        final CountDownLatch connectedLatch = new CountDownLatch(1);
+        final CountDownLatch nodeCreatedLatch = new CountDownLatch(1);
+        Callable<Void> proc = new Callable<Void>()
+        {
+            @Override
+            public Void call() throws Exception
+            {
+                while ( !Thread.currentThread().isInterrupted() )
+                {
+                    CuratorEvent event = 
eventServiceClient.getNextEvent(curatorProjection);
+                    if ( event.type == CuratorEventType.CONNECTION_CONNECTED )
+                    {
+                        connectedLatch.countDown();
+                    }
+                    else if ( event.type == CuratorEventType.WATCHED )
+                    {
+                        if ( event.watchedEvent.eventType == 
EventType.NodeCreated )
+                        {
+                            nodeCreatedLatch.countDown();
+                        }
+                    }
+                }
+                return null;
+            }
+        };
+        Future<Void> eventFuture = 
ThreadUtils.newSingleThreadExecutor("test").submit(proc);
+
+        Assert.assertTrue(timing.awaitLatch(connectedLatch));
+
+        ExistsSpec spec = new ExistsSpec();
+        spec.path = "/test";
+        spec.watched = true;
+        curatorServiceClient.exists(curatorProjection, spec);
+
+        CreateSpec createSpec = new CreateSpec();
+        createSpec.path = "/test";
+        curatorServiceClient.createNode(curatorProjection, createSpec);
+
+        Assert.assertTrue(timing.awaitLatch(nodeCreatedLatch));
+
+        eventFuture.cancel(true);
+    }
+
+    @Test
+    public void testLockMultiThread() throws Exception
+    {
+        final Timing timing = new Timing();
+
+        TSocket clientTransport = new TSocket("localhost", thriftPort);
+        clientTransport.setTimeout(timing.connection());
+        clientTransport.open();
+        TProtocol clientProtocol = new TBinaryProtocol(clientTransport);
+        final CuratorService.Client secondCuratorServiceClient = new 
CuratorService.Client(clientProtocol);
+        ExecutorService service = ThreadUtils.newFixedThreadPool(2, "test");
+        ExecutorCompletionService<Void> completer = new 
ExecutorCompletionService<Void>(service);
+
+        final CountDownLatch lockLatch = new CountDownLatch(2);
+        final AtomicBoolean hasTheLock = new AtomicBoolean();
+        for ( int i = 0; i < 2; ++i )
+        {
+            final CuratorService.Client client = (i == 0) ? 
curatorServiceClient : secondCuratorServiceClient;
+            Callable<Void> proc = new Callable<Void>()
+            {
+                @Override
+                public Void call() throws Exception
+                {
+                    CuratorProjection curatorProjection = 
client.newCuratorProjection("test");
+                    LockProjection lockProjection = 
client.acquireLock(curatorProjection, "/lock", 
timing.forWaiting().milliseconds());
+                    if ( lockProjection.id == null )
+                    {
+                        throw new Exception("Could not acquire lock");
+                    }
+                    try
+                    {
+                        if ( !hasTheLock.compareAndSet(false, true) )
+                        {
+                            throw new Exception("Two lockers");
+                        }
+
+                        timing.sleepABit();
+                    }
+                    finally
+                    {
+                        hasTheLock.set(false);
+                        lockLatch.countDown();
+                        client.closeGenericProjection(curatorProjection, 
lockProjection.id);
+                    }
+
+                    return null;
+                }
+            };
+            completer.submit(proc);
+        }
+
+        completer.take().get();
+        completer.take().get();
+
+        Assert.assertTrue(timing.awaitLatch(lockLatch));
+
+        service.shutdownNow();
+    }
+}
+

Reply via email to