wip
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/f103e02d Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/f103e02d Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/f103e02d Branch: refs/heads/curator-rpc Commit: f103e02dcc906f9d0387a1e720fa20f8aa479904 Parents: 10e52ad Author: randgalt <randg...@apache.org> Authored: Mon May 26 01:14:22 2014 -0500 Committer: randgalt <randg...@apache.org> Committed: Mon May 26 01:14:22 2014 -0500 ---------------------------------------------------------------------- .../curator/x/rpc/CuratorProjectionServer.java | 6 +- .../apache/curator/x/rpc/idl/CreateMode.java | 9 - .../apache/curator/x/rpc/idl/CreateSpec.java | 105 ------- .../curator/x/rpc/idl/CuratorProjection.java | 23 -- .../x/rpc/idl/CuratorProjectionService.java | 154 ---------- .../x/rpc/idl/CuratorProjectionSpec.java | 8 - .../x/rpc/idl/event/CuratorEventService.java | 23 ++ .../x/rpc/idl/event/CuratorRpcEvent.java | 279 +++++++++++++++++++ .../x/rpc/idl/event/CuratorRpcEventType.java | 19 ++ .../apache/curator/x/rpc/idl/event/RpcAcl.java | 43 +++ .../curator/x/rpc/idl/event/RpcEventType.java | 13 + .../apache/curator/x/rpc/idl/event/RpcId.java | 43 +++ .../curator/x/rpc/idl/event/RpcKeeperState.java | 16 ++ .../apache/curator/x/rpc/idl/event/RpcStat.java | 160 +++++++++++ .../x/rpc/idl/event/RpcWatchedEvent.java | 56 ++++ .../x/rpc/idl/projection/CreateMode.java | 9 + .../x/rpc/idl/projection/CreateSpec.java | 105 +++++++ .../x/rpc/idl/projection/CuratorProjection.java | 23 ++ .../projection/CuratorProjectionService.java | 133 +++++++++ .../idl/projection/CuratorProjectionSpec.java | 8 + curator-x-rpc/src/main/scripts/generate.sh | 21 +- .../src/main/thrift/curator-event.thrift | 64 +++++ curator-x-rpc/src/main/thrift/curator.thrift | 5 +- 23 files changed, 1017 insertions(+), 308 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/f103e02d/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java index 22a5e3a..489d5a6 100644 --- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java @@ -6,13 +6,15 @@ import com.facebook.swift.service.ThriftServer; import com.facebook.swift.service.ThriftServerConfig; import com.facebook.swift.service.ThriftServiceProcessor; import com.google.common.collect.Lists; -import org.apache.curator.x.rpc.idl.CuratorProjectionService; +import org.apache.curator.x.rpc.idl.event.CuratorEventService; +import org.apache.curator.x.rpc.idl.projection.CuratorProjectionService; public class CuratorProjectionServer { public static void main(String[] args) { - CuratorProjectionService projectionService = new CuratorProjectionService(); + CuratorEventService eventService = new CuratorEventService(); + CuratorProjectionService projectionService = new CuratorProjectionService(eventService); ThriftServiceProcessor processor = new ThriftServiceProcessor(new ThriftCodecManager(), Lists.<ThriftEventHandler>newArrayList(), projectionService); ThriftServer server = new ThriftServer(processor, new ThriftServerConfig().setPort(8899)); // TODO server.start(); http://git-wip-us.apache.org/repos/asf/curator/blob/f103e02d/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/CreateMode.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/CreateMode.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/CreateMode.java deleted file mode 100644 index f35e32b..0000000 --- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/CreateMode.java +++ /dev/null @@ -1,9 +0,0 @@ -package org.apache.curator.x.rpc.idl; - -public enum CreateMode -{ - PERSISTENT, - PERSISTENT_SEQUENTIAL, - EPHEMERAL, - EPHEMERAL_SEQUENTIAL -} http://git-wip-us.apache.org/repos/asf/curator/blob/f103e02d/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/CreateSpec.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/CreateSpec.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/CreateSpec.java deleted file mode 100644 index f401fb5..0000000 --- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/CreateSpec.java +++ /dev/null @@ -1,105 +0,0 @@ -package org.apache.curator.x.rpc.idl; - -import com.facebook.swift.codec.ThriftField; -import com.facebook.swift.codec.ThriftStruct; - -@ThriftStruct -public class CreateSpec -{ - private String path; - private String data; - private CreateMode mode; - private boolean async; - private String asyncId; - private boolean compressed; - private boolean creatingParentsIfNeeded; - private boolean withProtection; - - @ThriftField(1) - public String getPath() - { - return path; - } - - @ThriftField(2) - public String getAsyncId() - { - return asyncId; - } - - public void setAsyncId(String asyncId) - { - this.asyncId = asyncId; - } - - public void setPath(String path) - { - this.path = path; - } - - @ThriftField(3) - public String getData() - { - return data; - } - - public void setData(String data) - { - this.data = data; - } - - @ThriftField(4) - public CreateMode getMode() - { - return mode; - } - - public void setMode(CreateMode mode) - { - this.mode = mode; - } - - @ThriftField(5) - public boolean isAsync() - { - return async; - } - - public void setAsync(boolean async) - { - this.async = async; - } - - @ThriftField(6) - public boolean isCompressed() - { - return compressed; - } - - public void setCompressed(boolean compressed) - { - this.compressed = compressed; - } - - @ThriftField(7) - public boolean isCreatingParentsIfNeeded() - { - return creatingParentsIfNeeded; - } - - public void setCreatingParentsIfNeeded(boolean creatingParentsIfNeeded) - { - this.creatingParentsIfNeeded = creatingParentsIfNeeded; - } - - @ThriftField(8) - public boolean isWithProtection() - { - return withProtection; - } - - public void setWithProtection(boolean withProtection) - { - this.withProtection = withProtection; - } -} http://git-wip-us.apache.org/repos/asf/curator/blob/f103e02d/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/CuratorProjection.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/CuratorProjection.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/CuratorProjection.java deleted file mode 100644 index 85864c5..0000000 --- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/CuratorProjection.java +++ /dev/null @@ -1,23 +0,0 @@ -package org.apache.curator.x.rpc.idl; - -import com.facebook.swift.codec.ThriftConstructor; -import com.facebook.swift.codec.ThriftField; -import com.facebook.swift.codec.ThriftStruct; - -@ThriftStruct -public class CuratorProjection -{ - private final String id; - - @ThriftConstructor - public CuratorProjection(String id) - { - this.id = id; - } - - @ThriftField(1) - public String getId() - { - return id; - } -} http://git-wip-us.apache.org/repos/asf/curator/blob/f103e02d/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/CuratorProjectionService.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/CuratorProjectionService.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/CuratorProjectionService.java deleted file mode 100644 index 8acfd0f..0000000 --- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/CuratorProjectionService.java +++ /dev/null @@ -1,154 +0,0 @@ -package org.apache.curator.x.rpc.idl; - -import com.facebook.swift.service.ThriftMethod; -import com.facebook.swift.service.ThriftService; -import com.google.common.collect.Maps; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.framework.api.Compressible; -import org.apache.curator.framework.api.CreateBuilder; -import org.apache.curator.framework.api.CreateModable; -import org.apache.curator.framework.api.PathAndBytesable; -import org.apache.curator.retry.RetryOneTime; -import java.util.Map; -import java.util.UUID; - -@ThriftService("curator") -public class CuratorProjectionService -{ - private final Map<String, CuratorFramework> projections = Maps.newConcurrentMap(); - - @ThriftMethod - public CuratorProjection newCuratorProjection(CuratorProjectionSpec spec) - { - CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new RetryOneTime(1)); - String id = UUID.randomUUID().toString(); - client.start(); - projections.put(id, client); - return new CuratorProjection(id); - } - - @ThriftMethod - public void closeCuratorProjection(CuratorProjection projection) - { - CuratorFramework client = projections.remove(projection.getId()); - if ( client != null ) - { - client.close(); - } - } - - //@ThriftMethod - public String create(CuratorProjection projection, CreateSpec createSpec) throws Exception - { - CuratorFramework client = getClient(projection); - - Object builder = client.create(); - if ( createSpec.isCreatingParentsIfNeeded() ) - { - builder = castBuilder(builder, CreateBuilder.class).creatingParentsIfNeeded(); - } - if ( createSpec.isCompressed() ) - { - builder = castBuilder(builder, Compressible.class).compressed(); - } - if ( createSpec.isWithProtection() ) - { - builder = castBuilder(builder, CreateBuilder.class).withProtection(); - } - builder = castBuilder(builder, CreateModable.class).withMode(getRealMode(createSpec.getMode())); - - if ( createSpec.isAsync() ) - { -/* -TODO - BackgroundCallback backgroundCallback = new RestBackgroundCallback(context, Constants.CLIENT_CREATE_ASYNC, createSpec.getAsyncId()) - { - @Override - public void processResult(CuratorFramework client, CuratorEvent event) throws Exception - { - if ( event.getResultCode() == 0 ) - { - checkEphemeralCreate(createSpec, id, event.getName()); - } - super.processResult(client, event); - } - - @Override - protected String getMessage(CuratorEvent event) - { - PathAndId pathAndId = new PathAndId(String.valueOf(event.getName()), id); - try - { - return context.getWriter().writeValueAsString(pathAndId); - } - catch ( IOException e ) - { - log.error("Could not serialize PathAndId", e); - } - return "{}"; - } - - @Override - protected String getDetails(CuratorEvent event) - { - if ( event.getResultCode() != 0 ) - { - return super.getDetails(event); - } - return id; - } - }; - builder = castBuilder(builder, Backgroundable.class).inBackground(backgroundCallback); -*/ - } - - return String.valueOf(castBuilder(builder, PathAndBytesable.class).forPath(createSpec.getPath(), createSpec.getData().getBytes())); - } - - private org.apache.zookeeper.CreateMode getRealMode(CreateMode mode) - { - switch ( mode ) - { - case PERSISTENT: - { - return org.apache.zookeeper.CreateMode.PERSISTENT; - } - - case PERSISTENT_SEQUENTIAL: - { - return org.apache.zookeeper.CreateMode.PERSISTENT_SEQUENTIAL; - } - - case EPHEMERAL: - { - return org.apache.zookeeper.CreateMode.EPHEMERAL; - } - - case EPHEMERAL_SEQUENTIAL: - { - return org.apache.zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL; - } - } - throw new UnsupportedOperationException("Bad mode: " + mode.toString()); - } - - private CuratorFramework getClient(CuratorProjection projection) throws Exception - { - CuratorFramework client = projections.get(projection.getId()); - if ( client == null ) - { - throw new Exception("No client found with id: " + projection.getId()); - } - return client; - } - - private static <T> T castBuilder(Object createBuilder, Class<T> clazz) throws Exception - { - if ( clazz.isAssignableFrom(createBuilder.getClass()) ) - { - return clazz.cast(createBuilder); - } - throw new Exception("That operation is not available"); // TODO - } -} http://git-wip-us.apache.org/repos/asf/curator/blob/f103e02d/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/CuratorProjectionSpec.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/CuratorProjectionSpec.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/CuratorProjectionSpec.java deleted file mode 100644 index cf45632..0000000 --- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/CuratorProjectionSpec.java +++ /dev/null @@ -1,8 +0,0 @@ -package org.apache.curator.x.rpc.idl; - -import com.facebook.swift.codec.ThriftStruct; - -@ThriftStruct -public class CuratorProjectionSpec -{ -} http://git-wip-us.apache.org/repos/asf/curator/blob/f103e02d/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/CuratorEventService.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/CuratorEventService.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/CuratorEventService.java new file mode 100644 index 0000000..844de0d --- /dev/null +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/CuratorEventService.java @@ -0,0 +1,23 @@ +package org.apache.curator.x.rpc.idl.event; + +import com.facebook.swift.service.ThriftMethod; +import com.facebook.swift.service.ThriftService; +import com.google.common.collect.Queues; +import java.util.concurrent.BlockingQueue; + +@ThriftService("CuratorEventService") +public class CuratorEventService +{ + private final BlockingQueue<CuratorRpcEvent> events = Queues.newLinkedBlockingQueue(); + + @ThriftMethod + public CuratorRpcEvent getNextEvent() throws InterruptedException + { + return events.take(); + } + + public void addEvent(CuratorRpcEvent event) + { + events.offer(event); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/f103e02d/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/CuratorRpcEvent.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/CuratorRpcEvent.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/CuratorRpcEvent.java new file mode 100644 index 0000000..db2b278 --- /dev/null +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/CuratorRpcEvent.java @@ -0,0 +1,279 @@ +package org.apache.curator.x.rpc.idl.event; + +import com.facebook.swift.codec.ThriftField; +import com.facebook.swift.codec.ThriftStruct; +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.x.rpc.idl.projection.CuratorProjection; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import javax.annotation.Nullable; +import java.util.List; + +@SuppressWarnings("deprecation") +@ThriftStruct("CuratorEvent") +public class CuratorRpcEvent +{ + private final CuratorProjection projection; + private final CuratorEvent event; + + public CuratorRpcEvent() + { + throw new UnsupportedOperationException(); + } + + public CuratorRpcEvent(CuratorProjection projection, CuratorEvent event) + { + this.projection = projection; + this.event = event; + } + + @ThriftField(1) + public CuratorProjection getProjection() + { + return projection; + } + + @ThriftField(2) + public CuratorRpcEventType getType() + { + switch ( event.getType() ) + { + case CREATE: + { + return CuratorRpcEventType.CREATE; + } + + case DELETE: + { + return CuratorRpcEventType.DELETE; + } + + case EXISTS: + { + return CuratorRpcEventType.EXISTS; + } + + case GET_DATA: + { + return CuratorRpcEventType.GET_DATA; + } + + case SET_DATA: + { + return CuratorRpcEventType.SET_DATA; + } + + case CHILDREN: + { + return CuratorRpcEventType.CHILDREN; + } + + case SYNC: + { + return CuratorRpcEventType.SYNC; + } + + case GET_ACL: + { + return CuratorRpcEventType.GET_ACL; + } + + case SET_ACL: + { + return CuratorRpcEventType.SET_ACL; + } + + case WATCHED: + { + return CuratorRpcEventType.WATCHED; + } + + case CLOSING: + { + return CuratorRpcEventType.CLOSING; + } + } + + throw new IllegalStateException("Unknown type: " + event.getType()); + } + + @ThriftField(3) + public int getResultCode() + { + return event.getResultCode(); + } + + @ThriftField(4) + public String getPath() + { + return event.getPath(); + } + + @ThriftField(5) + public String getContext() + { + return String.valueOf(event.getContext()); + } + + @ThriftField(6) + public RpcStat getStat() + { + Stat stat = event.getStat(); + if ( stat != null ) + { + return new RpcStat + ( + stat.getCzxid(), + stat.getMzxid(), + stat.getCtime(), + stat.getMtime(), + stat.getVersion(), + stat.getCversion(), + stat.getAversion(), + stat.getEphemeralOwner(), + stat.getDataLength(), + stat.getNumChildren(), + stat.getPzxid() + ); + } + return null; + } + + @ThriftField(7) + public byte[] getData() + { + return event.getData(); + } + + @ThriftField(8) + public String getName() + { + return event.getPath(); + } + + @ThriftField(9) + public List<String> getChildren() + { + return event.getChildren(); + } + + @ThriftField(10) + public List<RpcAcl> getACLList() + { + List<ACL> aclList = event.getACLList(); + if ( aclList != null ) + { + return Lists.transform + ( + aclList, + new Function<ACL, RpcAcl>() + { + @Nullable + @Override + public RpcAcl apply(ACL acl) + { + RpcId id = new RpcId(acl.getId().getScheme(), acl.getId().getId()); + return new RpcAcl(acl.getPerms(), id); + } + } + ); + } + return null; + } + + @ThriftField(11) + public RpcWatchedEvent getWatchedEvent() + { + WatchedEvent watchedEvent = event.getWatchedEvent(); + if ( watchedEvent != null ) + { + RpcKeeperState keeperState = toRpcKeeperState(watchedEvent.getState()); + RpcEventType eventType = toRpcEventType(watchedEvent.getType()); + return new RpcWatchedEvent(keeperState, eventType, watchedEvent.getPath()); + } + return null; + } + + private RpcEventType toRpcEventType(Watcher.Event.EventType type) + { + switch ( type ) + { + case None: + { + return RpcEventType.None; + } + + case NodeCreated: + { + return RpcEventType.NodeCreated; + } + + case NodeDeleted: + { + return RpcEventType.NodeDeleted; + } + + case NodeDataChanged: + { + return RpcEventType.NodeDataChanged; + } + + case NodeChildrenChanged: + { + return RpcEventType.NodeChildrenChanged; + } + } + throw new IllegalStateException("Unknown type: " + type); + } + + private RpcKeeperState toRpcKeeperState(Watcher.Event.KeeperState state) + { + switch ( state ) + { + case Unknown: + { + return RpcKeeperState.Unknown; + } + + case Disconnected: + { + return RpcKeeperState.Disconnected; + } + + case NoSyncConnected: + { + return RpcKeeperState.NoSyncConnected; + } + + case SyncConnected: + { + return RpcKeeperState.SyncConnected; + } + + case AuthFailed: + { + return RpcKeeperState.AuthFailed; + } + + case ConnectedReadOnly: + { + return RpcKeeperState.ConnectedReadOnly; + } + + case SaslAuthenticated: + { + return RpcKeeperState.SaslAuthenticated; + } + + case Expired: + { + return RpcKeeperState.Expired; + } + } + throw new IllegalStateException("Unknown state: " + state); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/f103e02d/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/CuratorRpcEventType.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/CuratorRpcEventType.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/CuratorRpcEventType.java new file mode 100644 index 0000000..b905075 --- /dev/null +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/CuratorRpcEventType.java @@ -0,0 +1,19 @@ +package org.apache.curator.x.rpc.idl.event; + +import com.facebook.swift.codec.ThriftEnum; + +@ThriftEnum("CuratorEventType") +public enum CuratorRpcEventType +{ + CREATE, + DELETE, + EXISTS, + GET_DATA, + SET_DATA, + CHILDREN, + SYNC, + GET_ACL, + SET_ACL, + WATCHED, + CLOSING +} http://git-wip-us.apache.org/repos/asf/curator/blob/f103e02d/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcAcl.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcAcl.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcAcl.java new file mode 100644 index 0000000..54daa11 --- /dev/null +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcAcl.java @@ -0,0 +1,43 @@ +package org.apache.curator.x.rpc.idl.event; + +import com.facebook.swift.codec.ThriftField; +import com.facebook.swift.codec.ThriftStruct; + +@ThriftStruct("Acl") +public class RpcAcl +{ + private int perms; + private RpcId id; + + public RpcAcl() + { + } + + public RpcAcl(int perms, RpcId id) + { + this.perms = perms; + this.id = id; + } + + @ThriftField(1) + public int getPerms() + { + return perms; + } + + public void setPerms(int perms) + { + this.perms = perms; + } + + @ThriftField(2) + public RpcId getId() + { + return id; + } + + public void setId(RpcId id) + { + this.id = id; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/f103e02d/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcEventType.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcEventType.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcEventType.java new file mode 100644 index 0000000..8328940 --- /dev/null +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcEventType.java @@ -0,0 +1,13 @@ +package org.apache.curator.x.rpc.idl.event; + +import com.facebook.swift.codec.ThriftEnum; + +@ThriftEnum("EventType") +public enum RpcEventType +{ + None, + NodeCreated, + NodeDeleted, + NodeDataChanged, + NodeChildrenChanged +} http://git-wip-us.apache.org/repos/asf/curator/blob/f103e02d/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcId.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcId.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcId.java new file mode 100644 index 0000000..858dea1 --- /dev/null +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcId.java @@ -0,0 +1,43 @@ +package org.apache.curator.x.rpc.idl.event; + +import com.facebook.swift.codec.ThriftField; +import com.facebook.swift.codec.ThriftStruct; + +@ThriftStruct("id") +public class RpcId +{ + private String scheme; + private String id; + + public RpcId() + { + } + + public RpcId(String scheme, String id) + { + this.scheme = scheme; + this.id = id; + } + + @ThriftField(1) + public String getScheme() + { + return scheme; + } + + public void setScheme(String scheme) + { + this.scheme = scheme; + } + + @ThriftField(2) + public String getId() + { + return id; + } + + public void setId(String id) + { + this.id = id; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/f103e02d/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcKeeperState.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcKeeperState.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcKeeperState.java new file mode 100644 index 0000000..e5881a8 --- /dev/null +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcKeeperState.java @@ -0,0 +1,16 @@ +package org.apache.curator.x.rpc.idl.event; + +import com.facebook.swift.codec.ThriftEnum; + +@ThriftEnum("KeeperState") +public enum RpcKeeperState +{ + Unknown, + Disconnected, + NoSyncConnected, + SyncConnected, + AuthFailed, + ConnectedReadOnly, + SaslAuthenticated, + Expired +} http://git-wip-us.apache.org/repos/asf/curator/blob/f103e02d/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcStat.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcStat.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcStat.java new file mode 100644 index 0000000..382c45a --- /dev/null +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcStat.java @@ -0,0 +1,160 @@ +package org.apache.curator.x.rpc.idl.event; + +import com.facebook.swift.codec.ThriftField; +import com.facebook.swift.codec.ThriftStruct; + +@ThriftStruct("Stat") +public class RpcStat +{ + private long czxid; + private long mzxid; + private long ctime; + private long mtime; + private int version; + private int cversion; + private int aversion; + private long ephemeralOwner; + private int dataLength; + private int numChildren; + private long pzxid; + + public RpcStat() + { + } + + public RpcStat(long czxid, long mzxid, long ctime, long mtime, int version, int cversion, int aversion, long ephemeralOwner, int dataLength, int numChildren, long pzxid) + { + this.czxid = czxid; + this.mzxid = mzxid; + this.ctime = ctime; + this.mtime = mtime; + this.version = version; + this.cversion = cversion; + this.aversion = aversion; + this.ephemeralOwner = ephemeralOwner; + this.dataLength = dataLength; + this.numChildren = numChildren; + this.pzxid = pzxid; + } + + @ThriftField(1) + public long getCzxid() + { + return czxid; + } + + public void setCzxid(long czxid) + { + this.czxid = czxid; + } + + @ThriftField(2) + public long getMzxid() + { + return mzxid; + } + + public void setMzxid(long mzxid) + { + this.mzxid = mzxid; + } + + @ThriftField(3) + public long getCtime() + { + return ctime; + } + + public void setCtime(long ctime) + { + this.ctime = ctime; + } + + @ThriftField(4) + public long getMtime() + { + return mtime; + } + + public void setMtime(long mtime) + { + this.mtime = mtime; + } + + @ThriftField(5) + public int getVersion() + { + return version; + } + + public void setVersion(int version) + { + this.version = version; + } + + @ThriftField(6) + public int getCversion() + { + return cversion; + } + + public void setCversion(int cversion) + { + this.cversion = cversion; + } + + @ThriftField(7) + public int getAversion() + { + return aversion; + } + + public void setAversion(int aversion) + { + this.aversion = aversion; + } + + @ThriftField(8) + public long getEphemeralOwner() + { + return ephemeralOwner; + } + + public void setEphemeralOwner(long ephemeralOwner) + { + this.ephemeralOwner = ephemeralOwner; + } + + @ThriftField(9) + public int getDataLength() + { + return dataLength; + } + + public void setDataLength(int dataLength) + { + this.dataLength = dataLength; + } + + @ThriftField(10) + public int getNumChildren() + { + return numChildren; + } + + public void setNumChildren(int numChildren) + { + this.numChildren = numChildren; + } + + @ThriftField(11) + public long getPzxid() + { + return pzxid; + } + + public void setPzxid(long pzxid) + { + this.pzxid = pzxid; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/f103e02d/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcWatchedEvent.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcWatchedEvent.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcWatchedEvent.java new file mode 100644 index 0000000..69187dc --- /dev/null +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcWatchedEvent.java @@ -0,0 +1,56 @@ +package org.apache.curator.x.rpc.idl.event; + +import com.facebook.swift.codec.ThriftField; +import com.facebook.swift.codec.ThriftStruct; + +@ThriftStruct("WatchedEvent") +public class RpcWatchedEvent +{ + private RpcKeeperState keeperState; + private RpcEventType eventType; + private String path; + + public RpcWatchedEvent() + { + } + + public RpcWatchedEvent(RpcKeeperState keeperState, RpcEventType eventType, String path) + { + this.keeperState = keeperState; + this.eventType = eventType; + this.path = path; + } + + @ThriftField(1) + public RpcKeeperState getKeeperState() + { + return keeperState; + } + + public void setKeeperState(RpcKeeperState keeperState) + { + this.keeperState = keeperState; + } + + @ThriftField(2) + public RpcEventType getEventType() + { + return eventType; + } + + public void setEventType(RpcEventType eventType) + { + this.eventType = eventType; + } + + @ThriftField(3) + public String getPath() + { + return path; + } + + public void setPath(String path) + { + this.path = path; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/f103e02d/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CreateMode.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CreateMode.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CreateMode.java new file mode 100644 index 0000000..ea4ff84 --- /dev/null +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CreateMode.java @@ -0,0 +1,9 @@ +package org.apache.curator.x.rpc.idl.projection; + +public enum CreateMode +{ + PERSISTENT, + PERSISTENT_SEQUENTIAL, + EPHEMERAL, + EPHEMERAL_SEQUENTIAL +} http://git-wip-us.apache.org/repos/asf/curator/blob/f103e02d/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CreateSpec.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CreateSpec.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CreateSpec.java new file mode 100644 index 0000000..9019671 --- /dev/null +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CreateSpec.java @@ -0,0 +1,105 @@ +package org.apache.curator.x.rpc.idl.projection; + +import com.facebook.swift.codec.ThriftField; +import com.facebook.swift.codec.ThriftStruct; + +@ThriftStruct +public class CreateSpec +{ + private String path; + private String data; + private CreateMode mode; + private boolean async; + private String asyncId; + private boolean compressed; + private boolean creatingParentsIfNeeded; + private boolean withProtection; + + @ThriftField(1) + public String getPath() + { + return path; + } + + @ThriftField(2) + public String getAsyncId() + { + return asyncId; + } + + public void setAsyncId(String asyncId) + { + this.asyncId = asyncId; + } + + public void setPath(String path) + { + this.path = path; + } + + @ThriftField(3) + public String getData() + { + return data; + } + + public void setData(String data) + { + this.data = data; + } + + @ThriftField(4) + public CreateMode getMode() + { + return mode; + } + + public void setMode(CreateMode mode) + { + this.mode = mode; + } + + @ThriftField(5) + public boolean isAsync() + { + return async; + } + + public void setAsync(boolean async) + { + this.async = async; + } + + @ThriftField(6) + public boolean isCompressed() + { + return compressed; + } + + public void setCompressed(boolean compressed) + { + this.compressed = compressed; + } + + @ThriftField(7) + public boolean isCreatingParentsIfNeeded() + { + return creatingParentsIfNeeded; + } + + public void setCreatingParentsIfNeeded(boolean creatingParentsIfNeeded) + { + this.creatingParentsIfNeeded = creatingParentsIfNeeded; + } + + @ThriftField(8) + public boolean isWithProtection() + { + return withProtection; + } + + public void setWithProtection(boolean withProtection) + { + this.withProtection = withProtection; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/f103e02d/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjection.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjection.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjection.java new file mode 100644 index 0000000..053d825 --- /dev/null +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjection.java @@ -0,0 +1,23 @@ +package org.apache.curator.x.rpc.idl.projection; + +import com.facebook.swift.codec.ThriftConstructor; +import com.facebook.swift.codec.ThriftField; +import com.facebook.swift.codec.ThriftStruct; + +@ThriftStruct +public class CuratorProjection +{ + private final String id; + + @ThriftConstructor + public CuratorProjection(String id) + { + this.id = id; + } + + @ThriftField(1) + public String getId() + { + return id; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/f103e02d/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java new file mode 100644 index 0000000..aeb52c8 --- /dev/null +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java @@ -0,0 +1,133 @@ +package org.apache.curator.x.rpc.idl.projection; + +import com.facebook.swift.service.ThriftMethod; +import com.facebook.swift.service.ThriftService; +import com.google.common.collect.Maps; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.Backgroundable; +import org.apache.curator.framework.api.Compressible; +import org.apache.curator.framework.api.CreateBuilder; +import org.apache.curator.framework.api.CreateModable; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.PathAndBytesable; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.x.rpc.idl.event.CuratorEventService; +import org.apache.curator.x.rpc.idl.event.CuratorRpcEvent; +import java.util.Map; +import java.util.UUID; + +@ThriftService("CuratorService") +public class CuratorProjectionService +{ + private final Map<String, CuratorFramework> projections = Maps.newConcurrentMap(); + private final CuratorEventService eventService; + + public CuratorProjectionService(CuratorEventService eventService) + { + this.eventService = eventService; + } + + @ThriftMethod + public CuratorProjection newCuratorProjection(CuratorProjectionSpec spec) + { + CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new RetryOneTime(1)); + String id = UUID.randomUUID().toString(); + client.start(); + projections.put(id, client); + return new CuratorProjection(id); + } + + @ThriftMethod + public void closeCuratorProjection(CuratorProjection projection) + { + CuratorFramework client = projections.remove(projection.getId()); + if ( client != null ) + { + client.close(); + } + } + + @ThriftMethod + public String create(final CuratorProjection projection, CreateSpec createSpec) throws Exception + { + CuratorFramework client = getClient(projection); + + Object builder = client.create(); + if ( createSpec.isCreatingParentsIfNeeded() ) + { + builder = castBuilder(builder, CreateBuilder.class).creatingParentsIfNeeded(); + } + if ( createSpec.isCompressed() ) + { + builder = castBuilder(builder, Compressible.class).compressed(); + } + if ( createSpec.isWithProtection() ) + { + builder = castBuilder(builder, CreateBuilder.class).withProtection(); + } + builder = castBuilder(builder, CreateModable.class).withMode(getRealMode(createSpec.getMode())); + + if ( createSpec.isAsync() ) + { + BackgroundCallback backgroundCallback = new BackgroundCallback() + { + @Override + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception + { + eventService.addEvent(new CuratorRpcEvent(projection, event)); + } + }; + builder = castBuilder(builder, Backgroundable.class).inBackground(backgroundCallback); + } + + return String.valueOf(castBuilder(builder, PathAndBytesable.class).forPath(createSpec.getPath(), createSpec.getData().getBytes())); + } + + private org.apache.zookeeper.CreateMode getRealMode(CreateMode mode) + { + switch ( mode ) + { + case PERSISTENT: + { + return org.apache.zookeeper.CreateMode.PERSISTENT; + } + + case PERSISTENT_SEQUENTIAL: + { + return org.apache.zookeeper.CreateMode.PERSISTENT_SEQUENTIAL; + } + + case EPHEMERAL: + { + return org.apache.zookeeper.CreateMode.EPHEMERAL; + } + + case EPHEMERAL_SEQUENTIAL: + { + return org.apache.zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL; + } + } + throw new UnsupportedOperationException("Bad mode: " + mode.toString()); + } + + private CuratorFramework getClient(CuratorProjection projection) throws Exception + { + CuratorFramework client = projections.get(projection.getId()); + if ( client == null ) + { + throw new Exception("No client found with id: " + projection.getId()); + } + return client; + } + + private static <T> T castBuilder(Object createBuilder, Class<T> clazz) throws Exception + { + if ( clazz.isAssignableFrom(createBuilder.getClass()) ) + { + return clazz.cast(createBuilder); + } + throw new Exception("That operation is not available"); // TODO + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/f103e02d/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionSpec.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionSpec.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionSpec.java new file mode 100644 index 0000000..abec7f7 --- /dev/null +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionSpec.java @@ -0,0 +1,8 @@ +package org.apache.curator.x.rpc.idl.projection; + +import com.facebook.swift.codec.ThriftStruct; + +@ThriftStruct +public class CuratorProjectionSpec +{ +} http://git-wip-us.apache.org/repos/asf/curator/blob/f103e02d/curator-x-rpc/src/main/scripts/generate.sh ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/scripts/generate.sh b/curator-x-rpc/src/main/scripts/generate.sh index b094336..9fdc4c6 100755 --- a/curator-x-rpc/src/main/scripts/generate.sh +++ b/curator-x-rpc/src/main/scripts/generate.sh @@ -9,11 +9,20 @@ DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" BASE_DIR="$( cd "$DIR/../../../.." && pwd )" RPC_PATH="$BASE_DIR/curator-x-rpc/target/classes" -CLASSES="" -for f in `ls -m1 $RPC_PATH/org/apache/curator/x/rpc/idl/*.class | xargs -n 1 basename | sed s/\.[^\.]*$//`; + +PROJECTION_CLASSES="" +for f in `ls -m1 $RPC_PATH/org/apache/curator/x/rpc/idl/projection/*.class | xargs -n 1 basename | sed s/\.[^\.]*$//`; + do + if [[ $f != *[\$]* ]]; then + PROJECTION_CLASSES="$PROJECTION_CLASSES $f"; + fi; +done; + +EVENT_CLASSES="" +for f in `ls -m1 $RPC_PATH/org/apache/curator/x/rpc/idl/event/*.class | xargs -n 1 basename | sed s/\.[^\.]*$//`; do if [[ $f != *[\$]* ]]; then - CLASSES="$CLASSES $f"; + EVENT_CLASSES="$EVENT_CLASSES $f"; fi; done; @@ -25,6 +34,8 @@ PATHS="$PATHS:$BASE_DIR/curator-framework/target/classes" PATHS="$PATHS:$BASE_DIR/curator-recipes/target/classes" PATHS="$PATHS:$RPC_PATH" -PACKAGE="org.apache.curator.x.rpc.idl" +PROJECTION_PACKAGE="org.apache.curator.x.rpc.idl.projection" +EVENT_PACKAGE="org.apache.curator.x.rpc.idl.event" -java -cp $PATHS com.facebook.swift.generator.swift2thrift.Main -namespace cpp org.apache.curator -out "$THRIFT_DIR/curator.thrift" -package $PACKAGE $CLASSES +java -cp $PATHS com.facebook.swift.generator.swift2thrift.Main -namespace cpp org.apache.curator -out "$THRIFT_DIR/curator.thrift" -package $PROJECTION_PACKAGE $PROJECTION_CLASSES +java -cp $PATHS com.facebook.swift.generator.swift2thrift.Main -map org.apache.curator.x.rpc.idl.projection.CuratorProjection "curator.thrift" -namespace cpp org.apache.curator -out "$THRIFT_DIR/curator-event.thrift" -package $EVENT_PACKAGE $EVENT_CLASSES http://git-wip-us.apache.org/repos/asf/curator/blob/f103e02d/curator-x-rpc/src/main/thrift/curator-event.thrift ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/thrift/curator-event.thrift b/curator-x-rpc/src/main/thrift/curator-event.thrift new file mode 100644 index 0000000..9c407eb --- /dev/null +++ b/curator-x-rpc/src/main/thrift/curator-event.thrift @@ -0,0 +1,64 @@ +namespace java.swift org.apache.curator.x.rpc.idl.event +namespace cpp org.apache.curator + +include "curator.thrift" + +enum CuratorEventType { + CREATE, DELETE, EXISTS, GET_DATA, SET_DATA, CHILDREN, SYNC, GET_ACL, SET_ACL, WATCHED, CLOSING +} + +enum EventType { + None, NodeCreated, NodeDeleted, NodeDataChanged, NodeChildrenChanged +} + +enum KeeperState { + Unknown, Disconnected, NoSyncConnected, SyncConnected, AuthFailed, ConnectedReadOnly, SaslAuthenticated, Expired +} + +struct id { + 1: string scheme; + 2: string id; +} + +struct Stat { + 1: i64 czxid; + 2: i64 mzxid; + 3: i64 ctime; + 4: i64 mtime; + 5: i32 version; + 6: i32 cversion; + 7: i32 aversion; + 8: i64 ephemeralOwner; + 9: i32 dataLength; + 10: i32 numChildren; + 11: i64 pzxid; +} + +struct WatchedEvent { + 1: KeeperState keeperState; + 2: EventType eventType; + 3: string path; +} + +struct Acl { + 1: i32 perms; + 2: id id; +} + +struct CuratorEvent { + 1: curator.CuratorProjection projection; + 2: CuratorEventType type; + 3: i32 resultCode; + 4: string path; + 5: string context; + 6: Stat stat; + 7: binary data; + 8: string name; + 9: list<string> children; + 10: list<Acl> aCLList; + 11: WatchedEvent watchedEvent; +} + +service CuratorEventService { + CuratorEvent getNextEvent(); +} http://git-wip-us.apache.org/repos/asf/curator/blob/f103e02d/curator-x-rpc/src/main/thrift/curator.thrift ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/thrift/curator.thrift b/curator-x-rpc/src/main/thrift/curator.thrift index 5662949..7973580 100644 --- a/curator-x-rpc/src/main/thrift/curator.thrift +++ b/curator-x-rpc/src/main/thrift/curator.thrift @@ -1,4 +1,4 @@ -namespace java.swift org.apache.curator.x.rpc.idl +namespace java.swift org.apache.curator.x.rpc.idl.projection namespace cpp org.apache.curator @@ -24,7 +24,8 @@ struct CuratorProjection { struct CuratorProjectionSpec { } -service curator { +service CuratorService { void closeCuratorProjection(1: CuratorProjection projection); + string create(1: CuratorProjection projection, 2: CreateSpec createSpec); CuratorProjection newCuratorProjection(1: CuratorProjectionSpec spec); }