some more leader methods - start of path children cache
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/41ac42bb Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/41ac42bb Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/41ac42bb Branch: refs/heads/curator-rpc Commit: 41ac42bb67cd9e3196ec2f0873a4efe595c6ed41 Parents: 1f11d38 Author: randgalt <randg...@apache.org> Authored: Thu May 29 14:55:50 2014 -0500 Committer: randgalt <randg...@apache.org> Committed: Thu May 29 14:55:50 2014 -0500 ---------------------------------------------------------------------- .../curator/x/rpc/idl/event/LeaderEvent.java | 2 +- .../curator/x/rpc/idl/event/LeaderResult.java | 8 +- .../x/rpc/idl/event/OptionalChildrenList.java | 2 +- .../curator/x/rpc/idl/event/OptionalPath.java | 2 +- .../apache/curator/x/rpc/idl/event/RpcId.java | 2 +- .../curator/x/rpc/idl/event/RpcParticipant.java | 24 + .../projection/CuratorProjectionService.java | 108 +- .../x/rpc/idl/projection/LeaderProjection.java | 20 + .../projection/PathChildrenCacheProjection.java | 20 + .../projection/PathChildrenCacheStartMode.java | 8 + curator-x-rpc/src/main/thrift/curator.thrift | 26 +- .../java/org/apache/curator/generated/Acl.java | 18 +- .../curator/generated/CuratorService.java | 4190 ++++++++++++++++-- .../curator/generated/LeaderProjection.java | 393 ++ .../apache/curator/generated/LeaderResult.java | 18 +- .../apache/curator/generated/Participant.java | 486 ++ .../generated/PathChildrenCacheProjection.java | 393 ++ .../generated/PathChildrenCacheStartMode.java | 48 + .../java/org/apache/curator/generated/id.java | 58 +- 19 files changed, 5291 insertions(+), 535 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/41ac42bb/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/LeaderEvent.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/LeaderEvent.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/LeaderEvent.java index bd86dce..677c401 100644 --- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/LeaderEvent.java +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/LeaderEvent.java @@ -3,7 +3,7 @@ package org.apache.curator.x.rpc.idl.event; import com.facebook.swift.codec.ThriftField; import com.facebook.swift.codec.ThriftStruct; -@ThriftStruct("LeaderEvent") +@ThriftStruct public class LeaderEvent { @ThriftField(1) http://git-wip-us.apache.org/repos/asf/curator/blob/41ac42bb/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/LeaderResult.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/LeaderResult.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/LeaderResult.java index 7a67b62..8666105 100644 --- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/LeaderResult.java +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/LeaderResult.java @@ -2,13 +2,13 @@ package org.apache.curator.x.rpc.idl.event; import com.facebook.swift.codec.ThriftField; import com.facebook.swift.codec.ThriftStruct; -import org.apache.curator.x.rpc.idl.projection.GenericProjection; +import org.apache.curator.x.rpc.idl.projection.LeaderProjection; -@ThriftStruct("LeaderResult") +@ThriftStruct public class LeaderResult { @ThriftField(1) - public GenericProjection projection; + public LeaderProjection projection; @ThriftField(2) public boolean hasLeadership; @@ -17,7 +17,7 @@ public class LeaderResult { } - public LeaderResult(GenericProjection projection, boolean hasLeadership) + public LeaderResult(LeaderProjection projection, boolean hasLeadership) { this.projection = projection; this.hasLeadership = hasLeadership; http://git-wip-us.apache.org/repos/asf/curator/blob/41ac42bb/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/OptionalChildrenList.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/OptionalChildrenList.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/OptionalChildrenList.java index 229c7bc..0f0a928 100644 --- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/OptionalChildrenList.java +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/OptionalChildrenList.java @@ -4,7 +4,7 @@ import com.facebook.swift.codec.ThriftField; import com.facebook.swift.codec.ThriftStruct; import java.util.List; -@ThriftStruct("OptionalChildrenList") +@ThriftStruct public class OptionalChildrenList { @ThriftField(1) http://git-wip-us.apache.org/repos/asf/curator/blob/41ac42bb/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/OptionalPath.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/OptionalPath.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/OptionalPath.java index ba17a33..947dd75 100644 --- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/OptionalPath.java +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/OptionalPath.java @@ -3,7 +3,7 @@ package org.apache.curator.x.rpc.idl.event; import com.facebook.swift.codec.ThriftField; import com.facebook.swift.codec.ThriftStruct; -@ThriftStruct("OptionalPath") +@ThriftStruct public class OptionalPath { @ThriftField(1) http://git-wip-us.apache.org/repos/asf/curator/blob/41ac42bb/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcId.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcId.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcId.java index 8c360d6..a86556a 100644 --- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcId.java +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcId.java @@ -21,7 +21,7 @@ package org.apache.curator.x.rpc.idl.event; import com.facebook.swift.codec.ThriftField; import com.facebook.swift.codec.ThriftStruct; -@ThriftStruct("id") +@ThriftStruct("Id") public class RpcId { @ThriftField(1) http://git-wip-us.apache.org/repos/asf/curator/blob/41ac42bb/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcParticipant.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcParticipant.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcParticipant.java new file mode 100644 index 0000000..05a5d85 --- /dev/null +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcParticipant.java @@ -0,0 +1,24 @@ +package org.apache.curator.x.rpc.idl.event; + +import com.facebook.swift.codec.ThriftField; +import com.facebook.swift.codec.ThriftStruct; + +@ThriftStruct("Participant") +public class RpcParticipant +{ + @ThriftField(1) + public String id; + + @ThriftField(2) + public boolean isLeader; + + public RpcParticipant() + { + } + + public RpcParticipant(String id, boolean isLeader) + { + this.id = id; + this.isLeader = isLeader; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/41ac42bb/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java index 97f3a27..03d0c9f 100644 --- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java @@ -21,13 +21,21 @@ package org.apache.curator.x.rpc.idl.projection; import com.facebook.swift.service.ThriftMethod; import com.facebook.swift.service.ThriftService; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.*; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.framework.recipes.leader.LeaderLatch; import org.apache.curator.framework.recipes.leader.LeaderLatchListener; +import org.apache.curator.framework.recipes.leader.Participant; import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.utils.ThreadUtils; import org.apache.curator.x.rpc.connections.Closer; import org.apache.curator.x.rpc.connections.ConnectionManager; import org.apache.curator.x.rpc.connections.CuratorEntry; @@ -39,11 +47,13 @@ import org.apache.curator.x.rpc.idl.event.OptionalChildrenList; import org.apache.curator.x.rpc.idl.event.OptionalPath; import org.apache.curator.x.rpc.idl.event.OptionalRpcStat; import org.apache.curator.x.rpc.idl.event.RpcCuratorEvent; +import org.apache.curator.x.rpc.idl.event.RpcParticipant; import org.apache.curator.x.rpc.idl.event.RpcStat; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Collection; import java.util.List; import java.util.concurrent.TimeUnit; @@ -336,7 +346,97 @@ public class CuratorProjectionService } GenericProjection leaderProjection = new GenericProjection(id); - return new LeaderResult(leaderProjection, leaderLatch.hasLeadership()); + return new LeaderResult(new LeaderProjection(leaderProjection), leaderLatch.hasLeadership()); + } + + @ThriftMethod + public List<RpcParticipant> getLeaderParticipants(CuratorProjection projection, LeaderProjection leaderProjection) throws Exception + { + CuratorEntry entry = getEntry(projection); + + LeaderLatch leaderLatch = getThis(entry, leaderProjection.projection.id, LeaderLatch.class); + Collection<Participant> participants = leaderLatch.getParticipants(); + return Lists.transform + ( + Lists.newArrayList(participants), + new Function<Participant, RpcParticipant>() + { + @Override + public RpcParticipant apply(Participant participant) + { + return new RpcParticipant(participant.getId(), participant.isLeader()); + } + } + ); + } + + @ThriftMethod + public boolean isLeader(CuratorProjection projection, LeaderProjection leaderProjection) throws Exception + { + CuratorEntry entry = getEntry(projection); + + LeaderLatch leaderLatch = getThis(entry, leaderProjection.projection.id, LeaderLatch.class); + return leaderLatch.hasLeadership(); + } + + @ThriftMethod + public PathChildrenCacheProjection startPathChildrenCache(final CuratorProjection projection, final String path, boolean cacheData, boolean dataIsCompressed, PathChildrenCacheStartMode startMode) throws Exception + { + CuratorEntry entry = getEntry(projection); + + PathChildrenCache cache = new PathChildrenCache(entry.getClient(), path, cacheData, dataIsCompressed, ThreadUtils.newThreadFactory("PathChildrenCacheResource-%d")); + PathChildrenCache.StartMode actualStartMode = PathChildrenCache.StartMode.NORMAL; + switch ( startMode ) + { + case NORMAL: + { + actualStartMode = PathChildrenCache.StartMode.NORMAL; + break; + } + + case BUILD_INITIAL_CACHE: + { + actualStartMode = PathChildrenCache.StartMode.BUILD_INITIAL_CACHE; + break; + } + + case POST_INITIALIZED_EVENT: + { + actualStartMode = PathChildrenCache.StartMode.POST_INITIALIZED_EVENT; + break; + } + } + cache.start(actualStartMode); + + Closer<PathChildrenCache> closer = new Closer<PathChildrenCache>() + { + @Override + public void close(PathChildrenCache cache) + { + try + { + cache.close(); + } + catch ( IOException e ) + { + log.error("Could not close left-over PathChildrenCache for path: " + path, e); + } + } + }; + final String id = CuratorEntry.newId(); + entry.addThing(id, cache, closer); + + PathChildrenCacheListener listener = new PathChildrenCacheListener() + { + @Override + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception + { + // TODO + } + }; + cache.getListenable().addListener(listener); + + return new PathChildrenCacheProjection(new GenericProjection(id)); } public void addEvent(CuratorProjection projection, RpcCuratorEvent event) @@ -394,4 +494,10 @@ public class CuratorProjectionService throw new Exception("That operation is not available"); // TODO } + private <T> T getThis(CuratorEntry entry, String id, Class<T> clazz) + { + T thing = entry.getThing(id, clazz); + Preconditions.checkNotNull(thing, "No item of type " + clazz.getSimpleName() + " found with id " + id); + return thing; + } } http://git-wip-us.apache.org/repos/asf/curator/blob/41ac42bb/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/LeaderProjection.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/LeaderProjection.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/LeaderProjection.java new file mode 100644 index 0000000..51fb56f --- /dev/null +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/LeaderProjection.java @@ -0,0 +1,20 @@ +package org.apache.curator.x.rpc.idl.projection; + +import com.facebook.swift.codec.ThriftField; +import com.facebook.swift.codec.ThriftStruct; + +@ThriftStruct +public class LeaderProjection +{ + @ThriftField(1) + public GenericProjection projection; + + public LeaderProjection() + { + } + + public LeaderProjection(GenericProjection projection) + { + this.projection = projection; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/41ac42bb/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/PathChildrenCacheProjection.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/PathChildrenCacheProjection.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/PathChildrenCacheProjection.java new file mode 100644 index 0000000..666f43c --- /dev/null +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/PathChildrenCacheProjection.java @@ -0,0 +1,20 @@ +package org.apache.curator.x.rpc.idl.projection; + +import com.facebook.swift.codec.ThriftField; +import com.facebook.swift.codec.ThriftStruct; + +@ThriftStruct +public class PathChildrenCacheProjection +{ + @ThriftField(1) + public GenericProjection projection; + + public PathChildrenCacheProjection() + { + } + + public PathChildrenCacheProjection(GenericProjection projection) + { + this.projection = projection; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/41ac42bb/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/PathChildrenCacheStartMode.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/PathChildrenCacheStartMode.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/PathChildrenCacheStartMode.java new file mode 100644 index 0000000..bf695ba --- /dev/null +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/PathChildrenCacheStartMode.java @@ -0,0 +1,8 @@ +package org.apache.curator.x.rpc.idl.projection; + +public enum PathChildrenCacheStartMode +{ + NORMAL, + BUILD_INITIAL_CACHE, + POST_INITIALIZED_EVENT +} http://git-wip-us.apache.org/repos/asf/curator/blob/41ac42bb/curator-x-rpc/src/main/thrift/curator.thrift ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/thrift/curator.thrift b/curator-x-rpc/src/main/thrift/curator.thrift index ec52895..9ab4e62 100644 --- a/curator-x-rpc/src/main/thrift/curator.thrift +++ b/curator-x-rpc/src/main/thrift/curator.thrift @@ -7,6 +7,10 @@ enum CreateMode { PERSISTENT, PERSISTENT_SEQUENTIAL, EPHEMERAL, EPHEMERAL_SEQUENTIAL } +enum PathChildrenCacheStartMode { + NORMAL, BUILD_INITIAL_CACHE, POST_INITIALIZED_EVENT +} + enum CuratorEventType { PING, CREATE, DELETE, EXISTS, GET_DATA, SET_DATA, CHILDREN, SYNC, GET_ACL, SET_ACL, WATCHED, CLOSING, CONNECTION_CONNECTED, CONNECTION_SUSPENDED, CONNECTION_RECONNECTED, CONNECTION_LOST, CONNECTION_READ_ONLY, LEADER } @@ -56,6 +60,14 @@ struct GetDataSpec { 4: bool decompressed; } +struct LeaderProjection { + 1: GenericProjection projection; +} + +struct PathChildrenCacheProjection { + 1: GenericProjection projection; +} + struct Version { 1: i32 version; } @@ -67,7 +79,7 @@ struct LeaderEvent { } struct LeaderResult { - 1: GenericProjection projection; + 1: LeaderProjection projection; 2: bool hasLeadership; } @@ -79,11 +91,16 @@ struct OptionalPath { 1: string path; } -struct id { +struct Id { 1: string scheme; 2: string id; } +struct Participant { + 1: string id; + 2: bool isLeader; +} + struct Stat { 1: i64 czxid; 2: i64 mzxid; @@ -127,7 +144,7 @@ struct OptionalStat { struct Acl { 1: i32 perms; - 2: id id; + 2: Id id; } struct CuratorEvent { @@ -153,9 +170,12 @@ service CuratorService { OptionalStat exists(1: CuratorProjection projection, 2: ExistsSpec spec); OptionalChildrenList getChildren(1: CuratorProjection projection, 2: GetChildrenSpec spec); binary getData(1: CuratorProjection projection, 2: GetDataSpec spec); + list<Participant> getLeaderParticipants(1: CuratorProjection projection, 2: LeaderProjection leaderProjection); + bool isLeader(1: CuratorProjection projection, 2: LeaderProjection leaderProjection); CuratorProjection newCuratorProjection(1: string connectionName); Stat setData(1: CuratorProjection projection, 2: SetDataSpec spec); LeaderResult startLeaderSelector(1: CuratorProjection projection, 2: string path, 3: string participantId, 4: i32 waitForLeadershipMs); + PathChildrenCacheProjection startPathChildrenCache(1: CuratorProjection projection, 2: string path, 3: bool cacheData, 4: bool dataIsCompressed, 5: PathChildrenCacheStartMode startMode); } service EventService { http://git-wip-us.apache.org/repos/asf/curator/blob/41ac42bb/curator-x-rpc/src/test/java/org/apache/curator/generated/Acl.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/test/java/org/apache/curator/generated/Acl.java b/curator-x-rpc/src/test/java/org/apache/curator/generated/Acl.java index 3dd4cb5..a31e11e 100644 --- a/curator-x-rpc/src/test/java/org/apache/curator/generated/Acl.java +++ b/curator-x-rpc/src/test/java/org/apache/curator/generated/Acl.java @@ -45,7 +45,7 @@ public class Acl implements org.apache.thrift.TBase<Acl, Acl._Fields>, java.io.S } public int perms; // required - public id id; // required + public Id id; // required /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -117,7 +117,7 @@ public class Acl implements org.apache.thrift.TBase<Acl, Acl._Fields>, java.io.S tmpMap.put(_Fields.PERMS, new org.apache.thrift.meta_data.FieldMetaData("perms", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); tmpMap.put(_Fields.ID, new org.apache.thrift.meta_data.FieldMetaData("id", org.apache.thrift.TFieldRequirementType.DEFAULT, - new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, id.class))); + new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, Id.class))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(Acl.class, metaDataMap); } @@ -127,7 +127,7 @@ public class Acl implements org.apache.thrift.TBase<Acl, Acl._Fields>, java.io.S public Acl( int perms, - id id) + Id id) { this(); this.perms = perms; @@ -142,7 +142,7 @@ public class Acl implements org.apache.thrift.TBase<Acl, Acl._Fields>, java.io.S __isset_bitfield = other.__isset_bitfield; this.perms = other.perms; if (other.isSetId()) { - this.id = new id(other.id); + this.id = new Id(other.id); } } @@ -180,11 +180,11 @@ public class Acl implements org.apache.thrift.TBase<Acl, Acl._Fields>, java.io.S __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __PERMS_ISSET_ID, value); } - public id getId() { + public Id getId() { return this.id; } - public Acl setId(id id) { + public Acl setId(Id id) { this.id = id; return this; } @@ -218,7 +218,7 @@ public class Acl implements org.apache.thrift.TBase<Acl, Acl._Fields>, java.io.S if (value == null) { unsetId(); } else { - setId((id)value); + setId((Id)value); } break; @@ -408,7 +408,7 @@ public class Acl implements org.apache.thrift.TBase<Acl, Acl._Fields>, java.io.S break; case 2: // ID if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) { - struct.id = new id(); + struct.id = new Id(); struct.id.read(iprot); struct.setIdIsSet(true); } else { @@ -480,7 +480,7 @@ public class Acl implements org.apache.thrift.TBase<Acl, Acl._Fields>, java.io.S struct.setPermsIsSet(true); } if (incoming.get(1)) { - struct.id = new id(); + struct.id = new Id(); struct.id.read(iprot); struct.setIdIsSet(true); }