wip
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c8d49ae8 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c8d49ae8 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c8d49ae8 Branch: refs/heads/curator-rpc Commit: c8d49ae877c0ea95302c1339563293f31c1fd5db Parents: 7c99ddb Author: randgalt <randg...@apache.org> Authored: Mon May 26 18:02:05 2014 -0500 Committer: randgalt <randg...@apache.org> Committed: Mon May 26 18:02:05 2014 -0500 ---------------------------------------------------------------------- curator-x-rpc/pom.xml | 6 + .../curator/x/rpc/CuratorProjectionServer.java | 5 +- .../org/apache/curator/x/rpc/RpcManager.java | 9 + .../curator/x/rpc/idl/event/EventService.java | 36 +++- .../x/rpc/idl/event/RpcCuratorEvent.java | 60 +++++- .../x/rpc/idl/event/RpcCuratorEventType.java | 8 +- .../x/rpc/idl/projection/CreateSpec.java | 4 +- .../projection/CuratorProjectionService.java | 29 ++- curator-x-rpc/src/main/thrift/curator.thrift | 6 +- .../apache/curator/generated/CreateSpec.java | 37 ++-- .../curator/generated/CuratorEventType.java | 60 +++--- .../apache/curator/generated/EventService.java | 188 +++++++++++++++++-- .../org/apache/curator/x/rpc/TestClient.java | 29 ++- 13 files changed, 410 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/pom.xml ---------------------------------------------------------------------- diff --git a/curator-x-rpc/pom.xml b/curator-x-rpc/pom.xml index 42de982..cf2442c 100644 --- a/curator-x-rpc/pom.xml +++ b/curator-x-rpc/pom.xml @@ -20,5 +20,11 @@ <groupId>com.facebook.swift</groupId> <artifactId>swift-service</artifactId> </dependency> + + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-test</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java index 6fcc6d7..f5d5649 100644 --- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java @@ -31,8 +31,9 @@ public class CuratorProjectionServer { public static void main(String[] args) { - EventService eventService = new EventService(); - CuratorProjectionService projectionService = new CuratorProjectionService(eventService); + RpcManager rpcManager = new RpcManager(); + EventService eventService = new EventService(rpcManager, 5000); // TODO + CuratorProjectionService projectionService = new CuratorProjectionService(rpcManager, eventService); ThriftServiceProcessor processor = new ThriftServiceProcessor(new ThriftCodecManager(), Lists.<ThriftEventHandler>newArrayList(), projectionService, eventService); ThriftServer server = new ThriftServer(processor, new ThriftServerConfig().setPort(8899)); // TODO server.start(); http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java index 783297d..bdc5138 100644 --- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java @@ -2,6 +2,7 @@ package org.apache.curator.x.rpc; import com.google.common.collect.Maps; import org.apache.curator.framework.CuratorFramework; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; @@ -56,4 +57,12 @@ public class RpcManager CuratorEntry entry = projections.remove(id); return (entry != null) ? entry.client : null; } + + public void touch(List<String> ids) + { + for ( String id : ids ) + { + getClient(id); + } + } } http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/EventService.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/EventService.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/EventService.java index 206a347..3b7cde9 100644 --- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/EventService.java +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/EventService.java @@ -2,13 +2,28 @@ package org.apache.curator.x.rpc.idl.event; import com.facebook.swift.service.ThriftMethod; import com.facebook.swift.service.ThriftService; +import com.google.common.base.Function; +import com.google.common.collect.Lists; import com.google.common.collect.Queues; +import org.apache.curator.x.rpc.RpcManager; +import org.apache.curator.x.rpc.idl.projection.CuratorProjection; +import javax.annotation.Nullable; +import java.util.List; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; @ThriftService("EventService") public class EventService { private final BlockingQueue<RpcCuratorEvent> events = Queues.newLinkedBlockingQueue(); + private final RpcManager rpcManager; + private final int pingTimeMs; + + public EventService(RpcManager rpcManager, int pingTimeMs) + { + this.rpcManager = rpcManager; + this.pingTimeMs = pingTimeMs; + } public void addEvent(RpcCuratorEvent event) { @@ -16,8 +31,25 @@ public class EventService } @ThriftMethod - public RpcCuratorEvent getNextEvent() throws InterruptedException + public RpcCuratorEvent getNextEvent(List<CuratorProjection> projections) throws InterruptedException { - return events.take(); + if ( projections != null ) + { + List<String> ids = Lists.transform + ( + projections, + new Function<CuratorProjection, String>() + { + @Override + public String apply(CuratorProjection projection) + { + return projection.id; + } + } + ); + rpcManager.touch(ids); + } + RpcCuratorEvent event = events.poll(pingTimeMs, TimeUnit.MILLISECONDS); + return (event != null) ? event : new RpcCuratorEvent(); } } http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEvent.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEvent.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEvent.java index 6896e89..38a5ec3 100644 --- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEvent.java +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEvent.java @@ -24,6 +24,7 @@ import com.google.common.base.Function; import com.google.common.collect.Lists; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.x.rpc.idl.projection.CuratorProjection; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; @@ -71,7 +72,17 @@ public class RpcCuratorEvent public RpcCuratorEvent() { - throw new UnsupportedOperationException(); + this.projection = null; + this.type = RpcCuratorEventType.PING; + this.resultCode = 0; + this.path = null; + this.context = null; + this.stat = null; + this.data = null; + this.name = null; + this.children = null; + this.aclList = null; + this.watchedEvent = null; } public RpcCuratorEvent(CuratorProjection projection, CuratorEvent event) @@ -89,6 +100,53 @@ public class RpcCuratorEvent this.watchedEvent = toRpcWatchedEvent(event.getWatchedEvent()); } + public RpcCuratorEvent(CuratorProjection projection, ConnectionState newState) + { + this.projection = projection; + this.type = toRpcCuratorEventType(newState); + this.resultCode = 0; + this.path = null; + this.context = null; + this.stat = null; + this.data = null; + this.name = null; + this.children = null; + this.aclList = null; + this.watchedEvent = null; + } + + private RpcCuratorEventType toRpcCuratorEventType(ConnectionState state) + { + switch ( state ) + { + case CONNECTED: + { + return RpcCuratorEventType.CONNECTION_CONNECTED; + } + + case SUSPENDED: + { + return RpcCuratorEventType.CONNECTION_SUSPENDED; + } + + case RECONNECTED: + { + return RpcCuratorEventType.CONNECTION_RECONNECTED; + } + + case LOST: + { + return RpcCuratorEventType.CONNECTION_LOST; + } + + case READ_ONLY: + { + return RpcCuratorEventType.CONNECTION_READ_ONLY; + } + } + throw new IllegalStateException("Unknown state: " + state); + } + private RpcCuratorEventType toRpcCuratorEventType(CuratorEventType eventType) { switch ( eventType ) http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEventType.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEventType.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEventType.java index f08aa4a..f8d6468 100644 --- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEventType.java +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEventType.java @@ -23,6 +23,7 @@ import com.facebook.swift.codec.ThriftEnum; @ThriftEnum("CuratorEventType") public enum RpcCuratorEventType { + PING, CREATE, DELETE, EXISTS, @@ -33,5 +34,10 @@ public enum RpcCuratorEventType GET_ACL, SET_ACL, WATCHED, - CLOSING + CLOSING, + CONNECTION_CONNECTED, + CONNECTION_SUSPENDED, + CONNECTION_RECONNECTED, + CONNECTION_LOST, + CONNECTION_READ_ONLY } http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CreateSpec.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CreateSpec.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CreateSpec.java index 8e7acf7..d451b26 100644 --- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CreateSpec.java +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CreateSpec.java @@ -28,7 +28,7 @@ public class CreateSpec public String path; @ThriftField(2) - public String data; + public byte[] data; @ThriftField(3) public CreateMode mode; @@ -49,7 +49,7 @@ public class CreateSpec { } - public CreateSpec(String path, String data, CreateMode mode, boolean doAsync, boolean compressed, boolean creatingParentsIfNeeded, boolean withProtection) + public CreateSpec(String path, byte[] data, CreateMode mode, boolean doAsync, boolean compressed, boolean creatingParentsIfNeeded, boolean withProtection) { this.path = path; this.data = data; http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java index 34eff59..bca32d9 100644 --- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java @@ -1,3 +1,4 @@ + /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -29,6 +30,8 @@ import org.apache.curator.framework.api.CreateBuilder; import org.apache.curator.framework.api.CreateModable; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.PathAndBytesable; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.x.rpc.RpcManager; import org.apache.curator.x.rpc.idl.event.EventService; @@ -38,11 +41,12 @@ import java.util.UUID; @ThriftService("CuratorService") public class CuratorProjectionService { - private final RpcManager rpcManager = new RpcManager(); + private final RpcManager rpcManager; private final EventService eventService; - public CuratorProjectionService(EventService eventService) + public CuratorProjectionService(RpcManager rpcManager, EventService eventService) { + this.rpcManager = rpcManager; this.eventService = eventService; } @@ -53,7 +57,19 @@ public class CuratorProjectionService String id = UUID.randomUUID().toString(); client.start(); rpcManager.add(id, client); - return new CuratorProjection(id); + final CuratorProjection projection = new CuratorProjection(id); + + ConnectionStateListener listener = new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + eventService.addEvent(new RpcCuratorEvent(projection, newState)); + } + }; + client.getConnectionStateListenable().addListener(listener); + + return projection; } @ThriftMethod @@ -84,7 +100,10 @@ public class CuratorProjectionService { builder = castBuilder(builder, CreateBuilder.class).withProtection(); } - builder = castBuilder(builder, CreateModable.class).withMode(getRealMode(createSpec.mode)); + if ( createSpec.mode != null ) + { + builder = castBuilder(builder, CreateModable.class).withMode(getRealMode(createSpec.mode)); + } if ( createSpec.doAsync ) { @@ -99,7 +118,7 @@ public class CuratorProjectionService builder = castBuilder(builder, Backgroundable.class).inBackground(backgroundCallback); } - return String.valueOf(castBuilder(builder, PathAndBytesable.class).forPath(createSpec.path, createSpec.data.getBytes())); + return String.valueOf(castBuilder(builder, PathAndBytesable.class).forPath(createSpec.path, createSpec.data)); } private org.apache.zookeeper.CreateMode getRealMode(CreateMode mode) http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/src/main/thrift/curator.thrift ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/thrift/curator.thrift b/curator-x-rpc/src/main/thrift/curator.thrift index b9d5beb..100f456 100644 --- a/curator-x-rpc/src/main/thrift/curator.thrift +++ b/curator-x-rpc/src/main/thrift/curator.thrift @@ -8,7 +8,7 @@ enum CreateMode { } enum CuratorEventType { - CREATE, DELETE, EXISTS, GET_DATA, SET_DATA, CHILDREN, SYNC, GET_ACL, SET_ACL, WATCHED, CLOSING + PING, CREATE, DELETE, EXISTS, GET_DATA, SET_DATA, CHILDREN, SYNC, GET_ACL, SET_ACL, WATCHED, CLOSING, CONNECTION_CONNECTED, CONNECTION_SUSPENDED, CONNECTION_RECONNECTED, CONNECTION_LOST, CONNECTION_READ_ONLY } enum EventType { @@ -21,7 +21,7 @@ enum KeeperState { struct CreateSpec { 1: string path; - 2: string data; + 2: binary data; 3: CreateMode mode; 4: bool doAsync; 5: bool compressed; @@ -87,5 +87,5 @@ service CuratorService { } service EventService { - CuratorEvent getNextEvent(); + CuratorEvent getNextEvent(1: list<CuratorProjection> projections); } http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/src/test/java/org/apache/curator/generated/CreateSpec.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/test/java/org/apache/curator/generated/CreateSpec.java b/curator-x-rpc/src/test/java/org/apache/curator/generated/CreateSpec.java index 67437d3..4488285 100644 --- a/curator-x-rpc/src/test/java/org/apache/curator/generated/CreateSpec.java +++ b/curator-x-rpc/src/test/java/org/apache/curator/generated/CreateSpec.java @@ -50,7 +50,7 @@ public class CreateSpec implements org.apache.thrift.TBase<CreateSpec, CreateSpe } public String path; // required - public String data; // required + public ByteBuffer data; // required /** * * @see CreateMode @@ -153,7 +153,7 @@ public class CreateSpec implements org.apache.thrift.TBase<CreateSpec, CreateSpe tmpMap.put(_Fields.PATH, new org.apache.thrift.meta_data.FieldMetaData("path", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); tmpMap.put(_Fields.DATA, new org.apache.thrift.meta_data.FieldMetaData("data", org.apache.thrift.TFieldRequirementType.DEFAULT, - new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING , true))); tmpMap.put(_Fields.MODE, new org.apache.thrift.meta_data.FieldMetaData("mode", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, CreateMode.class))); tmpMap.put(_Fields.DO_ASYNC, new org.apache.thrift.meta_data.FieldMetaData("doAsync", org.apache.thrift.TFieldRequirementType.DEFAULT, @@ -173,7 +173,7 @@ public class CreateSpec implements org.apache.thrift.TBase<CreateSpec, CreateSpe public CreateSpec( String path, - String data, + ByteBuffer data, CreateMode mode, boolean doAsync, boolean compressed, @@ -203,7 +203,8 @@ public class CreateSpec implements org.apache.thrift.TBase<CreateSpec, CreateSpe this.path = other.path; } if (other.isSetData()) { - this.data = other.data; + this.data = org.apache.thrift.TBaseHelper.copyBinary(other.data); +; } if (other.isSetMode()) { this.mode = other.mode; @@ -257,11 +258,21 @@ public class CreateSpec implements org.apache.thrift.TBase<CreateSpec, CreateSpe } } - public String getData() { - return this.data; + public byte[] getData() { + setData(org.apache.thrift.TBaseHelper.rightSize(data)); + return data == null ? null : data.array(); + } + + public ByteBuffer bufferForData() { + return data; + } + + public CreateSpec setData(byte[] data) { + setData(data == null ? (ByteBuffer)null : ByteBuffer.wrap(data)); + return this; } - public CreateSpec setData(String data) { + public CreateSpec setData(ByteBuffer data) { this.data = data; return this; } @@ -419,7 +430,7 @@ public class CreateSpec implements org.apache.thrift.TBase<CreateSpec, CreateSpe if (value == null) { unsetData(); } else { - setData((String)value); + setData((ByteBuffer)value); } break; @@ -712,7 +723,7 @@ public class CreateSpec implements org.apache.thrift.TBase<CreateSpec, CreateSpe if (this.data == null) { sb.append("null"); } else { - sb.append(this.data); + org.apache.thrift.TBaseHelper.toString(this.data, sb); } first = false; if (!first) sb.append(", "); @@ -794,7 +805,7 @@ public class CreateSpec implements org.apache.thrift.TBase<CreateSpec, CreateSpe break; case 2: // DATA if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { - struct.data = iprot.readString(); + struct.data = iprot.readBinary(); struct.setDataIsSet(true); } else { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); @@ -862,7 +873,7 @@ public class CreateSpec implements org.apache.thrift.TBase<CreateSpec, CreateSpe } if (struct.data != null) { oprot.writeFieldBegin(DATA_FIELD_DESC); - oprot.writeString(struct.data); + oprot.writeBinary(struct.data); oprot.writeFieldEnd(); } if (struct.mode != null) { @@ -926,7 +937,7 @@ public class CreateSpec implements org.apache.thrift.TBase<CreateSpec, CreateSpe oprot.writeString(struct.path); } if (struct.isSetData()) { - oprot.writeString(struct.data); + oprot.writeBinary(struct.data); } if (struct.isSetMode()) { oprot.writeI32(struct.mode.getValue()); @@ -954,7 +965,7 @@ public class CreateSpec implements org.apache.thrift.TBase<CreateSpec, CreateSpe struct.setPathIsSet(true); } if (incoming.get(1)) { - struct.data = iprot.readString(); + struct.data = iprot.readBinary(); struct.setDataIsSet(true); } if (incoming.get(2)) { http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorEventType.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorEventType.java b/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorEventType.java index 60350fc..ce31158 100644 --- a/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorEventType.java +++ b/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorEventType.java @@ -12,17 +12,23 @@ import java.util.HashMap; import org.apache.thrift.TEnum; public enum CuratorEventType implements org.apache.thrift.TEnum { - CREATE(0), - DELETE(1), - EXISTS(2), - GET_DATA(3), - SET_DATA(4), - CHILDREN(5), - SYNC(6), - GET_ACL(7), - SET_ACL(8), - WATCHED(9), - CLOSING(10); + PING(0), + CREATE(1), + DELETE(2), + EXISTS(3), + GET_DATA(4), + SET_DATA(5), + CHILDREN(6), + SYNC(7), + GET_ACL(8), + SET_ACL(9), + WATCHED(10), + CLOSING(11), + CONNECTION_CONNECTED(12), + CONNECTION_SUSPENDED(13), + CONNECTION_RECONNECTED(14), + CONNECTION_LOST(15), + CONNECTION_READ_ONLY(16); private final int value; @@ -44,27 +50,39 @@ public enum CuratorEventType implements org.apache.thrift.TEnum { public static CuratorEventType findByValue(int value) { switch (value) { case 0: - return CREATE; + return PING; case 1: - return DELETE; + return CREATE; case 2: - return EXISTS; + return DELETE; case 3: - return GET_DATA; + return EXISTS; case 4: - return SET_DATA; + return GET_DATA; case 5: - return CHILDREN; + return SET_DATA; case 6: - return SYNC; + return CHILDREN; case 7: - return GET_ACL; + return SYNC; case 8: - return SET_ACL; + return GET_ACL; case 9: - return WATCHED; + return SET_ACL; case 10: + return WATCHED; + case 11: return CLOSING; + case 12: + return CONNECTION_CONNECTED; + case 13: + return CONNECTION_SUSPENDED; + case 14: + return CONNECTION_RECONNECTED; + case 15: + return CONNECTION_LOST; + case 16: + return CONNECTION_READ_ONLY; default: return null; } http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/src/test/java/org/apache/curator/generated/EventService.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/test/java/org/apache/curator/generated/EventService.java b/curator-x-rpc/src/test/java/org/apache/curator/generated/EventService.java index 7c8e294..7b41888 100644 --- a/curator-x-rpc/src/test/java/org/apache/curator/generated/EventService.java +++ b/curator-x-rpc/src/test/java/org/apache/curator/generated/EventService.java @@ -36,13 +36,13 @@ public class EventService { public interface Iface { - public CuratorEvent getNextEvent() throws org.apache.thrift.TException; + public CuratorEvent getNextEvent(List<CuratorProjection> projections) throws org.apache.thrift.TException; } public interface AsyncIface { - public void getNextEvent(org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; + public void getNextEvent(List<CuratorProjection> projections, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; } @@ -66,15 +66,16 @@ public class EventService { super(iprot, oprot); } - public CuratorEvent getNextEvent() throws org.apache.thrift.TException + public CuratorEvent getNextEvent(List<CuratorProjection> projections) throws org.apache.thrift.TException { - send_getNextEvent(); + send_getNextEvent(projections); return recv_getNextEvent(); } - public void send_getNextEvent() throws org.apache.thrift.TException + public void send_getNextEvent(List<CuratorProjection> projections) throws org.apache.thrift.TException { getNextEvent_args args = new getNextEvent_args(); + args.setProjections(projections); sendBase("getNextEvent", args); } @@ -106,21 +107,24 @@ public class EventService { super(protocolFactory, clientManager, transport); } - public void getNextEvent(org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException { + public void getNextEvent(List<CuratorProjection> projections, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException { checkReady(); - getNextEvent_call method_call = new getNextEvent_call(resultHandler, this, ___protocolFactory, ___transport); + getNextEvent_call method_call = new getNextEvent_call(projections, resultHandler, this, ___protocolFactory, ___transport); this.___currentMethod = method_call; ___manager.call(method_call); } public static class getNextEvent_call extends org.apache.thrift.async.TAsyncMethodCall { - public getNextEvent_call(org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException { + private List<CuratorProjection> projections; + public getNextEvent_call(List<CuratorProjection> projections, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException { super(client, protocolFactory, transport, resultHandler, false); + this.projections = projections; } public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException { prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("getNextEvent", org.apache.thrift.protocol.TMessageType.CALL, 0)); getNextEvent_args args = new getNextEvent_args(); + args.setProjections(projections); args.write(prot); prot.writeMessageEnd(); } @@ -167,7 +171,7 @@ public class EventService { public getNextEvent_result getResult(I iface, getNextEvent_args args) throws org.apache.thrift.TException { getNextEvent_result result = new getNextEvent_result(); - result.success = iface.getNextEvent(); + result.success = iface.getNextEvent(args.projections); return result; } } @@ -236,7 +240,7 @@ public class EventService { } public void start(I iface, getNextEvent_args args, org.apache.thrift.async.AsyncMethodCallback<CuratorEvent> resultHandler) throws TException { - iface.getNextEvent(resultHandler); + iface.getNextEvent(args.projections,resultHandler); } } @@ -245,6 +249,7 @@ public class EventService { public static class getNextEvent_args implements org.apache.thrift.TBase<getNextEvent_args, getNextEvent_args._Fields>, java.io.Serializable, Cloneable, Comparable<getNextEvent_args> { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("getNextEvent_args"); + private static final org.apache.thrift.protocol.TField PROJECTIONS_FIELD_DESC = new org.apache.thrift.protocol.TField("projections", org.apache.thrift.protocol.TType.LIST, (short)1); private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); static { @@ -252,10 +257,11 @@ public class EventService { schemes.put(TupleScheme.class, new getNextEvent_argsTupleSchemeFactory()); } + public List<CuratorProjection> projections; // required /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { -; + PROJECTIONS((short)1, "projections"); private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); @@ -270,6 +276,8 @@ public class EventService { */ public static _Fields findByThriftId(int fieldId) { switch(fieldId) { + case 1: // PROJECTIONS + return PROJECTIONS; default: return null; } @@ -308,9 +316,14 @@ public class EventService { return _fieldName; } } + + // isset id assignments public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.PROJECTIONS, new org.apache.thrift.meta_data.FieldMetaData("projections", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, + new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, CuratorProjection.class)))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(getNextEvent_args.class, metaDataMap); } @@ -318,10 +331,24 @@ public class EventService { public getNextEvent_args() { } + public getNextEvent_args( + List<CuratorProjection> projections) + { + this(); + this.projections = projections; + } + /** * Performs a deep copy on <i>other</i>. */ public getNextEvent_args(getNextEvent_args other) { + if (other.isSetProjections()) { + List<CuratorProjection> __this__projections = new ArrayList<CuratorProjection>(other.projections.size()); + for (CuratorProjection other_element : other.projections) { + __this__projections.add(new CuratorProjection(other_element)); + } + this.projections = __this__projections; + } } public getNextEvent_args deepCopy() { @@ -330,15 +357,66 @@ public class EventService { @Override public void clear() { + this.projections = null; + } + + public int getProjectionsSize() { + return (this.projections == null) ? 0 : this.projections.size(); + } + + public java.util.Iterator<CuratorProjection> getProjectionsIterator() { + return (this.projections == null) ? null : this.projections.iterator(); + } + + public void addToProjections(CuratorProjection elem) { + if (this.projections == null) { + this.projections = new ArrayList<CuratorProjection>(); + } + this.projections.add(elem); + } + + public List<CuratorProjection> getProjections() { + return this.projections; + } + + public getNextEvent_args setProjections(List<CuratorProjection> projections) { + this.projections = projections; + return this; + } + + public void unsetProjections() { + this.projections = null; + } + + /** Returns true if field projections is set (has been assigned a value) and false otherwise */ + public boolean isSetProjections() { + return this.projections != null; + } + + public void setProjectionsIsSet(boolean value) { + if (!value) { + this.projections = null; + } } public void setFieldValue(_Fields field, Object value) { switch (field) { + case PROJECTIONS: + if (value == null) { + unsetProjections(); + } else { + setProjections((List<CuratorProjection>)value); + } + break; + } } public Object getFieldValue(_Fields field) { switch (field) { + case PROJECTIONS: + return getProjections(); + } throw new IllegalStateException(); } @@ -350,6 +428,8 @@ public class EventService { } switch (field) { + case PROJECTIONS: + return isSetProjections(); } throw new IllegalStateException(); } @@ -367,6 +447,15 @@ public class EventService { if (that == null) return false; + boolean this_present_projections = true && this.isSetProjections(); + boolean that_present_projections = true && that.isSetProjections(); + if (this_present_projections || that_present_projections) { + if (!(this_present_projections && that_present_projections)) + return false; + if (!this.projections.equals(that.projections)) + return false; + } + return true; } @@ -383,6 +472,16 @@ public class EventService { int lastComparison = 0; + lastComparison = Boolean.valueOf(isSetProjections()).compareTo(other.isSetProjections()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetProjections()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.projections, other.projections); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -403,6 +502,13 @@ public class EventService { StringBuilder sb = new StringBuilder("getNextEvent_args("); boolean first = true; + sb.append("projections:"); + if (this.projections == null) { + sb.append("null"); + } else { + sb.append(this.projections); + } + first = false; sb.append(")"); return sb.toString(); } @@ -446,6 +552,25 @@ public class EventService { break; } switch (schemeField.id) { + case 1: // PROJECTIONS + if (schemeField.type == org.apache.thrift.protocol.TType.LIST) { + { + org.apache.thrift.protocol.TList _list16 = iprot.readListBegin(); + struct.projections = new ArrayList<CuratorProjection>(_list16.size); + for (int _i17 = 0; _i17 < _list16.size; ++_i17) + { + CuratorProjection _elem18; + _elem18 = new CuratorProjection(); + _elem18.read(iprot); + struct.projections.add(_elem18); + } + iprot.readListEnd(); + } + struct.setProjectionsIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -461,6 +586,18 @@ public class EventService { struct.validate(); oprot.writeStructBegin(STRUCT_DESC); + if (struct.projections != null) { + oprot.writeFieldBegin(PROJECTIONS_FIELD_DESC); + { + oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, struct.projections.size())); + for (CuratorProjection _iter19 : struct.projections) + { + _iter19.write(oprot); + } + oprot.writeListEnd(); + } + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -478,11 +615,40 @@ public class EventService { @Override public void write(org.apache.thrift.protocol.TProtocol prot, getNextEvent_args struct) throws org.apache.thrift.TException { TTupleProtocol oprot = (TTupleProtocol) prot; + BitSet optionals = new BitSet(); + if (struct.isSetProjections()) { + optionals.set(0); + } + oprot.writeBitSet(optionals, 1); + if (struct.isSetProjections()) { + { + oprot.writeI32(struct.projections.size()); + for (CuratorProjection _iter20 : struct.projections) + { + _iter20.write(oprot); + } + } + } } @Override public void read(org.apache.thrift.protocol.TProtocol prot, getNextEvent_args struct) throws org.apache.thrift.TException { TTupleProtocol iprot = (TTupleProtocol) prot; + BitSet incoming = iprot.readBitSet(1); + if (incoming.get(0)) { + { + org.apache.thrift.protocol.TList _list21 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, iprot.readI32()); + struct.projections = new ArrayList<CuratorProjection>(_list21.size); + for (int _i22 = 0; _i22 < _list21.size; ++_i22) + { + CuratorProjection _elem23; + _elem23 = new CuratorProjection(); + _elem23.read(iprot); + struct.projections.add(_elem23); + } + } + struct.setProjectionsIsSet(true); + } } } http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestClient.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestClient.java b/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestClient.java index e479031..79cea30 100644 --- a/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestClient.java +++ b/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestClient.java @@ -18,21 +18,26 @@ */ package org.apache.curator.x.rpc; +import org.apache.curator.generated.CreateSpec; import org.apache.curator.generated.CuratorEvent; import org.apache.curator.generated.CuratorProjection; import org.apache.curator.generated.CuratorProjectionSpec; import org.apache.curator.generated.CuratorService; import org.apache.curator.generated.EventService; +import org.apache.curator.test.TestingServer; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TSocket; +import java.util.Arrays; import java.util.concurrent.Executors; public class TestClient { - public static void main(String[] args) throws TException + public static void main(String[] args) throws Exception { + new TestingServer(2181); + TSocket clientTransport = new TSocket("localhost", 8899); clientTransport.open(); TProtocol clientProtocol = new TBinaryProtocol(clientTransport); @@ -43,24 +48,36 @@ public class TestClient TProtocol eventProtocol = new TBinaryProtocol(eventTransport); final EventService.Client serviceClient = new EventService.Client(eventProtocol); + final CuratorProjection curatorProjection = client.newCuratorProjection(new CuratorProjectionSpec()); + Executors.newSingleThreadExecutor().submit - (new Runnable() + ( + new Runnable() { @Override public void run() { try { - CuratorEvent nextEvent = serviceClient.getNextEvent(); - System.out.println(nextEvent.type); + //noinspection InfiniteLoopStatement + for(;;) + { + CuratorEvent nextEvent = serviceClient.getNextEvent(Arrays.asList(curatorProjection)); + System.out.println(nextEvent.type); + } } catch ( TException e ) { e.printStackTrace(); } } - }); + } + ); - CuratorProjection curatorProjection = client.newCuratorProjection(new CuratorProjectionSpec()); + CreateSpec createSpec = new CreateSpec(); + createSpec.path = "/a/b/c"; + createSpec.creatingParentsIfNeeded = true; + String path = client.create(curatorProjection, createSpec); + System.out.println(path); } }