wip on doc
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/48b0759b Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/48b0759b Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/48b0759b Branch: refs/heads/curator-rpc Commit: 48b0759b735a889a12f421994f051616b5c62248 Parents: 78460f9 Author: randgalt <randg...@apache.org> Authored: Sun Jun 1 12:52:55 2014 -0500 Committer: randgalt <randg...@apache.org> Committed: Sun Jun 1 12:52:55 2014 -0500 ---------------------------------------------------------------------- .../src/site/confluence/deploy.confluence | 4 +- .../src/site/confluence/events.confluence | 4 + .../src/site/confluence/index.confluence | 6 +- .../src/site/confluence/usage.confluence | 85 +++++++++- .../org/apache/curator/x/rpc/TestClient.java | 167 +++++++++++++++++++ .../org/apache/curator/x/rpc/TestServer.java | 34 ++++ 6 files changed, 294 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/48b0759b/curator-x-rpc/src/site/confluence/deploy.confluence ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/site/confluence/deploy.confluence b/curator-x-rpc/src/site/confluence/deploy.confluence index af3b682..8c93cb8 100644 --- a/curator-x-rpc/src/site/confluence/deploy.confluence +++ b/curator-x-rpc/src/site/confluence/deploy.confluence @@ -11,13 +11,13 @@ Curator RPC is built as an "uber" Java JAR and can be downloaded from Maven Cent java -jar curator-x-rpc-VERSION.jar <argument> {noformat} -The argument is either a configuration file or a JSON or YAML string. See [[Configuration|configuration.html]] for details. +The argument is either a configuration file or a JSON or YAML string. Call without the argument for help text. See [[Configuration|configuration.html]] for details. h2. Deploying Curator RPC is designed to have an instance of its Thrift Server co\-located on each client instance that needs to connect to ZooKeeper (see the figure below). Each Curator RPC instance is configured (see [[Configuration|configuration.html]]) to connect to one or more -ZooKeeper clusters. The Curator Framework instances are maintained inside of the Curator RPC instances and RPC clients reference these instances by ID. +ZooKeeper clusters. The Curator Framework instances are maintained inside of the Curator RPC instances and RPC clients reference these instances by name. How you configure your server to launch depends on your environment and other needs. Here are some suggestions: http://git-wip-us.apache.org/repos/asf/curator/blob/48b0759b/curator-x-rpc/src/site/confluence/events.confluence ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/site/confluence/events.confluence b/curator-x-rpc/src/site/confluence/events.confluence new file mode 100644 index 0000000..416b978 --- /dev/null +++ b/curator-x-rpc/src/site/confluence/events.confluence @@ -0,0 +1,4 @@ +[[Curator RPC Proxy|index.html]] / Events + +h1. Events + http://git-wip-us.apache.org/repos/asf/curator/blob/48b0759b/curator-x-rpc/src/site/confluence/index.confluence ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/site/confluence/index.confluence b/curator-x-rpc/src/site/confluence/index.confluence index f29aff9..962a3e1 100644 --- a/curator-x-rpc/src/site/confluence/index.confluence +++ b/curator-x-rpc/src/site/confluence/index.confluence @@ -29,4 +29,8 @@ See the [[Usage Page|usage.html]] for details on using the RPC proxy. h2. Configuration -See the [[Configuration|configuration.html]] for details on configuring the RPC proxy. +See [[Configuration|configuration.html]] for details on configuring the RPC proxy. + +h2. Events + +See [[Events|events.html]] for details on the Curator RPC event loop and its structure. http://git-wip-us.apache.org/repos/asf/curator/blob/48b0759b/curator-x-rpc/src/site/confluence/usage.confluence ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/site/confluence/usage.confluence b/curator-x-rpc/src/site/confluence/usage.confluence index a20c13f..9ddaf97 100644 --- a/curator-x-rpc/src/site/confluence/usage.confluence +++ b/curator-x-rpc/src/site/confluence/usage.confluence @@ -18,10 +18,89 @@ Three Thrift Services are included with the Curator RPC: ||Service||Description|| |CuratorService|The main service for accessing the Curator APIs and recipes| -|EventService|Used to receive out-of-band messages for callbacks, watchers, etc| +|EventService|Used to receive out\-of\-band messages for callbacks, watchers, etc. See [Events|events.html] for details.| |DiscoveryService|Curator's ServiceDiscovery recipe| h2. Concepts -_Projections - Many of the Curator RPC APIs refer to "projections" (e.g. CuratorProjection). A projection is an id that refers -to a real object instance inside of the RPC server. +h4. Projections + +Many of the Curator RPC APIs refer to "projections" (e.g. CuratorProjection). A projection is an id that refers +to a real object instance inside of the RPC server. The projection is a "handle" or "cookie" that directly refers to that instance. + +h4. Thrift Client Equals a Thread + +It's important to remember that each thrift client is the equivalent of a system thread. i.e. you cannot have multiple outstanding +calls in multiple threads with a given client. For each thread, you should allocate a separate client. A Thrift Client maps directly +to a single TCP/IP socket. + +h4. Event Loop + +You must dedicate a separate thread for getting events via the Curator RPC [EventService|events.html]. Curator will report async results, +connection state changes, watcher triggers, etc. via this event loop. + +h4. CuratorProjection Expiration + +If you don't make an API call using a CuratorProjection within the [configured timeout|configuration.html] the projection instance +will be closed and any open recipes, etc. associated with it will be closed. NOTE: calls to the EventService will cause the +CuratorProjection to be "touched". So, as long as your event loop is running your CuratorProjection instance will be kept open. + +h2. Initialization + +After setting up Thrift, create a connection to the CuratorService and the EventService. If you plan on using Curator Discovery, create a connection +to DiscoveryService. Allocate a CuratorProjection instance and then start a thread watching events for that instance. Here is pseudo code: + +{code} +CuratorService.Client curatorService = new CuratorService.Client() +EventService.Client eventService = new EventService.Client() + +curatorProjection = curatorService.newCuratorProjection(name) + +inThread => { + while ( isOpen ) { + event = eventService.getNextEvent(curatorProjection) + ... process event ... + } +} +{code} + +h2. Usage + +Once initialized, use recipes/APIs as needed. Here is an example of using the lock recipe: + +{code} +lockId = client.acquireLock(curatorProjection, "/mylock", 10000); +if lockId.id == null { + // lock attempt failed. Throw exception, etc. +} + +// you now own the lock + +client.closeGenericProjection(curatorProjection, lockId.id); +{code} + +Here is an example of using the path cache: + +{code} +cacheProjection = client.startPathChildrenCache(curatorProjection, "/path", true, false, BUILD_INITIAL_CACHE) + +... + +data = client.getPathChildrenCacheDataForPath(curatorProjection, cacheProjection, "/path/child"); + +... + +// in your event loop, you will get events for the cache. e.g. +event = eventService.getNextEvent(curatorProjection) +if event.type == PATH_CHILDREN_CACHE { + if event.childrenCacheEvent.type == CHILD_UPDATED { + // node described by event.childrenCacheEvent.data has changed + // event.childrenCacheEvent.cachedPath is the path that was passed to startPathChildrenCache() + } +} + +... + +// when done with the cache, close it +client.closeGenericProjection(curatorProjection, cacheProjection.id); +{code} http://git-wip-us.apache.org/repos/asf/curator/blob/48b0759b/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestClient.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestClient.java b/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestClient.java new file mode 100644 index 0000000..43a2ffc --- /dev/null +++ b/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestClient.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.rpc; + +import org.apache.curator.generated.*; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TSocket; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.Executors; + +public class TestClient +{ + public static void main(String[] args) throws Exception + { + TSocket clientTransport = new TSocket("localhost", 8899); + clientTransport.open(); + TProtocol clientProtocol = new TBinaryProtocol(clientTransport); + final CuratorService.Client client = new CuratorService.Client(clientProtocol); + + TSocket eventTransport = new TSocket("localhost", 8899); + eventTransport.open(); + TProtocol eventProtocol = new TBinaryProtocol(eventTransport); + final EventService.Client serviceClient = new EventService.Client(eventProtocol); + + TSocket discoveryTransport = new TSocket("localhost", 8899); + discoveryTransport.open(); + TProtocol discoveryProtocol = new TBinaryProtocol(discoveryTransport); + final DiscoveryService.Client discoveryClient = new DiscoveryService.Client(discoveryProtocol); + + final CuratorProjection curatorProjection = client.newCuratorProjection("test"); + + Executors.newSingleThreadExecutor().submit + ( + new Runnable() + { + @Override + public void run() + { + try + { + //noinspection InfiniteLoopStatement + for(;;) + { + CuratorEvent nextEvent = serviceClient.getNextEvent(curatorProjection); + System.out.println(nextEvent); + } + } + catch ( TException e ) + { + e.printStackTrace(); + } + } + } + ); + + CreateSpec createSpec = new CreateSpec(); + createSpec.path = "/this/should/fail"; + createSpec.data = ByteBuffer.wrap("hey".getBytes()); + try + { + client.createNode(curatorProjection, createSpec); + } + catch ( CuratorException e ) + { + System.out.println("Ex: " + e); + } + + createSpec = new CreateSpec(); + createSpec.path = "/a/b/c"; + createSpec.creatingParentsIfNeeded = true; + createSpec.data = ByteBuffer.wrap("hey".getBytes()); + OptionalPath path = client.createNode(curatorProjection, createSpec); + System.out.println("Path: " + path); + + PathChildrenCacheProjection pathChildrenCacheProjection = client.startPathChildrenCache(curatorProjection, "/a/b", true, false, PathChildrenCacheStartMode.BUILD_INITIAL_CACHE); + + NodeCacheProjection nodeCache = client.startNodeCache(curatorProjection, "/a/b/c", false, true); + ChildData nodeCacheData = client.getNodeCacheData(curatorProjection, nodeCache); + System.out.println("nodeCacheData: " + nodeCacheData); + + List<ChildData> pathChildrenCacheData = client.getPathChildrenCacheData(curatorProjection, pathChildrenCacheProjection); + System.out.println("Child data: " + pathChildrenCacheData); + + GetChildrenSpec getChildrenSpec = new GetChildrenSpec(); + getChildrenSpec.path = "/a"; + OptionalChildrenList children = client.getChildren(curatorProjection, getChildrenSpec); + System.out.println("Children: " + children); + + ChildData pathChildrenCacheDataForPath = client.getPathChildrenCacheDataForPath(curatorProjection, pathChildrenCacheProjection, "/a/b/c"); + System.out.println(pathChildrenCacheDataForPath); + + LockProjection lockId = client.acquireLock(curatorProjection, "/mylock", 1000); + client.closeGenericProjection(curatorProjection, lockId.id); + + GetDataSpec getDataSpec = new GetDataSpec(); + getDataSpec.watched = true; + getDataSpec.path = "/a/b/c"; + ByteBuffer data = client.getData(curatorProjection, getDataSpec); + System.out.println("getData: " + new String(data.array())); + + ExistsSpec existsSpec = new ExistsSpec(); + existsSpec.path = "/a/b/c"; + System.out.println("exists: " + client.exists(curatorProjection, existsSpec)); + + DeleteSpec deleteSpec = new DeleteSpec(); + deleteSpec.path = "/a/b/c"; + client.deleteNode(curatorProjection, deleteSpec); + + System.out.println("exists: " + client.exists(curatorProjection, existsSpec)); + + LeaderResult leader = client.startLeaderSelector(curatorProjection, "/leader", "me", 10000); + System.out.println("Has Leader: " + leader.hasLeadership); + + List<Participant> leaderParticipants = client.getLeaderParticipants(curatorProjection, leader.projection); + System.out.println("Participants: " + leaderParticipants); + + boolean isLeader = client.isLeader(curatorProjection, leader.projection); + System.out.println("isLeader: " + isLeader); + + client.closeGenericProjection(curatorProjection, leader.projection.id); + + pathChildrenCacheData = client.getPathChildrenCacheData(curatorProjection, pathChildrenCacheProjection); + System.out.println("Child data: " + pathChildrenCacheData); + + nodeCacheData = client.getNodeCacheData(curatorProjection, nodeCache); + System.out.println("nodeCacheData: " + nodeCacheData); + + PersistentEphemeralNodeProjection node = client.startPersistentEphemeralNode(curatorProjection, "/my/path", ByteBuffer.wrap("hey".getBytes()), PersistentEphemeralNodeMode.EPHEMERAL); + existsSpec.path = "/my/path"; + OptionalStat nodeExists = client.exists(curatorProjection, existsSpec); + System.out.println("nodeExists: " + nodeExists); + client.closeGenericProjection(curatorProjection, node.id); + + List<LeaseProjection> leaseProjections = client.startSemaphore(curatorProjection, "/semi", 3, 1000, 10); + System.out.println("leaseProjections: " + leaseProjections); + for ( LeaseProjection leaseProjection : leaseProjections ) + { + client.closeGenericProjection(curatorProjection, leaseProjection.id); + } + + DiscoveryInstance yourInstance = discoveryClient.makeDiscoveryInstance("mine", ByteBuffer.wrap(new byte[]{}), 8080); + DiscoveryProjection discovery = discoveryClient.startDiscovery(curatorProjection, "/discovery", yourInstance); + DiscoveryProviderProjection provider = discoveryClient.startProvider(curatorProjection, discovery, "mine", ProviderStrategyType.ROUND_ROBIN, 1000, 3); + + DiscoveryInstance instance = discoveryClient.getInstance(curatorProjection, provider); + System.out.println("Instance: " + instance); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/48b0759b/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestServer.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestServer.java b/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestServer.java new file mode 100644 index 0000000..3564777 --- /dev/null +++ b/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestServer.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.rpc; + +import com.google.common.io.Resources; +import org.apache.curator.test.TestingServer; +import java.nio.charset.Charset; + +public class TestServer +{ + public static void main(String[] args) throws Exception + { + new TestingServer(2181); + + String configurationSource = Resources.toString(Resources.getResource("configuration/test.json"), Charset.defaultCharset()); + CuratorProjectionServer.main(new String[]{configurationSource}); + } +}