wip on service discovery
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/b79909a4 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/b79909a4 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/b79909a4 Branch: refs/heads/master Commit: b79909a4cb17ac5fe20f12fcd5b8e0761492d1b4 Parents: 87582a9 Author: randgalt <randg...@apache.org> Authored: Fri May 30 17:23:27 2014 -0500 Committer: randgalt <randg...@apache.org> Committed: Fri May 30 17:23:27 2014 -0500 ---------------------------------------------------------------------- .../x/rpc/idl/discovery/DiscoveryInstance.java | 21 +- .../x/rpc/idl/discovery/DiscoveryService.java | 11 +- .../idl/discovery/DiscoveryServiceLowLevel.java | 74 +- .../idl/services/CuratorProjectionService.java | 11 +- curator-x-rpc/src/main/thrift/curator.thrift | 3 + .../generated/DiscoveryServiceLowLevel.java | 3699 +++++++++++++++++- 6 files changed, 3754 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/b79909a4/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/discovery/DiscoveryInstance.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/discovery/DiscoveryInstance.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/discovery/DiscoveryInstance.java index 94e6307..4cd67d6 100644 --- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/discovery/DiscoveryInstance.java +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/discovery/DiscoveryInstance.java @@ -40,15 +40,18 @@ public class DiscoveryInstance public DiscoveryInstance(ServiceInstance<byte[]> instance) { - this.name = instance.getName(); - this.id = instance.getId(); - this.address = instance.getAddress(); - this.port = instance.getPort(); - this.sslPort = instance.getSslPort(); - this.payload = instance.getPayload(); - this.registrationTimeUTC = instance.getRegistrationTimeUTC(); - this.serviceType = DiscoveryInstanceType.valueOf(instance.getServiceType().name()); - this.uriSpec = instance.buildUriSpec(); + if ( instance != null ) + { + this.name = instance.getName(); + this.id = instance.getId(); + this.address = instance.getAddress(); + this.port = instance.getPort(); + this.sslPort = instance.getSslPort(); + this.payload = instance.getPayload(); + this.registrationTimeUTC = instance.getRegistrationTimeUTC(); + this.serviceType = DiscoveryInstanceType.valueOf(instance.getServiceType().name()); + this.uriSpec = instance.buildUriSpec(); + } } public DiscoveryInstance(String name, String id, String address, Integer port, Integer sslPort, byte[] payload, long registrationTimeUTC, DiscoveryInstanceType serviceType, String uriSpec) http://git-wip-us.apache.org/repos/asf/curator/blob/b79909a4/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/discovery/DiscoveryService.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/discovery/DiscoveryService.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/discovery/DiscoveryService.java index edfc141..5ed9a01 100644 --- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/discovery/DiscoveryService.java +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/discovery/DiscoveryService.java @@ -3,6 +3,7 @@ package org.apache.curator.x.rpc.idl.discovery; import com.facebook.swift.service.ThriftMethod; import com.facebook.swift.service.ThriftService; import com.google.common.base.Function; +import com.google.common.collect.Collections2; import com.google.common.collect.Lists; import org.apache.curator.x.discovery.DownInstancePolicy; import org.apache.curator.x.discovery.ProviderStrategy; @@ -21,6 +22,7 @@ import org.apache.curator.x.rpc.idl.structs.CuratorProjection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Collection; import java.util.List; import java.util.concurrent.TimeUnit; @@ -159,15 +161,15 @@ public class DiscoveryService } @ThriftMethod - public List<DiscoveryInstance> getAllInstances(CuratorProjection projection, DiscoveryProviderProjection providerProjection) throws RpcException + public Collection<DiscoveryInstance> getAllInstances(CuratorProjection projection, DiscoveryProviderProjection providerProjection) throws RpcException { CuratorEntry entry = CuratorEntry.mustGetEntry(connectionManager, projection); @SuppressWarnings("unchecked") ServiceProvider<byte[]> serviceProvider = CuratorEntry.mustGetThing(entry, providerProjection.id, ServiceProvider.class); try { - List<ServiceInstance<byte[]>> allInstances = Lists.newArrayList(serviceProvider.getAllInstances()); - return Lists.transform + Collection<ServiceInstance<byte[]>> allInstances = serviceProvider.getAllInstances(); + return Collections2.transform ( allInstances, new Function<ServiceInstance<byte[]>, DiscoveryInstance>() @@ -177,8 +179,7 @@ public class DiscoveryService { return new DiscoveryInstance(instance); } - } - ); + }); } catch ( Exception e ) { http://git-wip-us.apache.org/repos/asf/curator/blob/b79909a4/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/discovery/DiscoveryServiceLowLevel.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/discovery/DiscoveryServiceLowLevel.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/discovery/DiscoveryServiceLowLevel.java index b08be1d..fa7dbfe 100644 --- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/discovery/DiscoveryServiceLowLevel.java +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/discovery/DiscoveryServiceLowLevel.java @@ -3,26 +3,16 @@ package org.apache.curator.x.rpc.idl.discovery; import com.facebook.swift.service.ThriftMethod; import com.facebook.swift.service.ThriftService; import com.google.common.base.Function; -import com.google.common.collect.Lists; -import org.apache.curator.x.discovery.DownInstancePolicy; -import org.apache.curator.x.discovery.ProviderStrategy; +import com.google.common.collect.Collections2; import org.apache.curator.x.discovery.ServiceDiscovery; -import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; import org.apache.curator.x.discovery.ServiceInstance; -import org.apache.curator.x.discovery.ServiceProvider; -import org.apache.curator.x.discovery.strategies.RandomStrategy; -import org.apache.curator.x.discovery.strategies.RoundRobinStrategy; -import org.apache.curator.x.discovery.strategies.StickyStrategy; -import org.apache.curator.x.rpc.connections.Closer; import org.apache.curator.x.rpc.connections.ConnectionManager; import org.apache.curator.x.rpc.connections.CuratorEntry; import org.apache.curator.x.rpc.idl.exceptions.RpcException; import org.apache.curator.x.rpc.idl.structs.CuratorProjection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.List; -import java.util.concurrent.TimeUnit; +import java.util.Collection; @ThriftService public class DiscoveryServiceLowLevel @@ -82,4 +72,64 @@ public class DiscoveryServiceLowLevel throw new RpcException(e); } } + + @ThriftMethod + public Collection<String> queryForNames(CuratorProjection projection, DiscoveryProjection discoveryProjection) throws RpcException + { + CuratorEntry entry = CuratorEntry.mustGetEntry(connectionManager, projection); + @SuppressWarnings("unchecked") + ServiceDiscovery<byte[]> serviceDiscovery = CuratorEntry.mustGetThing(entry, discoveryProjection.id, ServiceDiscovery.class); + try + { + return serviceDiscovery.queryForNames(); + } + catch ( Exception e ) + { + throw new RpcException(e); + } + } + + @ThriftMethod + public DiscoveryInstance queryForInstance(CuratorProjection projection, DiscoveryProjection discoveryProjection, String name, String id) throws RpcException + { + CuratorEntry entry = CuratorEntry.mustGetEntry(connectionManager, projection); + @SuppressWarnings("unchecked") + ServiceDiscovery<byte[]> serviceDiscovery = CuratorEntry.mustGetThing(entry, discoveryProjection.id, ServiceDiscovery.class); + try + { + return new DiscoveryInstance(serviceDiscovery.queryForInstance(name, id)); + } + catch ( Exception e ) + { + throw new RpcException(e); + } + } + + @ThriftMethod + public Collection<DiscoveryInstance> queryForInstances(CuratorProjection projection, DiscoveryProjection discoveryProjection, String name) throws RpcException + { + CuratorEntry entry = CuratorEntry.mustGetEntry(connectionManager, projection); + @SuppressWarnings("unchecked") + ServiceDiscovery<byte[]> serviceDiscovery = CuratorEntry.mustGetThing(entry, discoveryProjection.id, ServiceDiscovery.class); + try + { + Collection<ServiceInstance<byte[]>> instances = serviceDiscovery.queryForInstances(name); + return Collections2.transform + ( + instances, + new Function<ServiceInstance<byte[]>, DiscoveryInstance>() + { + @Override + public DiscoveryInstance apply(ServiceInstance<byte[]> instance) + { + return new DiscoveryInstance(instance); + } + } + ); + } + catch ( Exception e ) + { + throw new RpcException(e); + } + } } http://git-wip-us.apache.org/repos/asf/curator/blob/b79909a4/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/services/CuratorProjectionService.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/services/CuratorProjectionService.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/services/CuratorProjectionService.java index b7a1145..d218e7b 100644 --- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/services/CuratorProjectionService.java +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/services/CuratorProjectionService.java @@ -22,6 +22,7 @@ package org.apache.curator.x.rpc.idl.services; import com.facebook.swift.service.ThriftMethod; import com.facebook.swift.service.ThriftService; import com.google.common.base.Function; +import com.google.common.collect.Collections2; import com.google.common.collect.Lists; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.*; @@ -422,7 +423,7 @@ public class CuratorProjectionService } @ThriftMethod - public List<RpcParticipant> getLeaderParticipants(CuratorProjection projection, LeaderProjection leaderProjection) throws RpcException + public Collection<RpcParticipant> getLeaderParticipants(CuratorProjection projection, LeaderProjection leaderProjection) throws RpcException { try { @@ -430,14 +431,18 @@ public class CuratorProjectionService LeaderLatch leaderLatch = CuratorEntry.mustGetThing(entry, leaderProjection.id, LeaderLatch.class); Collection<Participant> participants = leaderLatch.getParticipants(); - return Lists.transform(Lists.newArrayList(participants), new Function<Participant, RpcParticipant>() + return Collections2.transform + ( + participants, + new Function<Participant, RpcParticipant>() { @Override public RpcParticipant apply(Participant participant) { return new RpcParticipant(participant.getId(), participant.isLeader()); } - }); + } + ); } catch ( Exception e ) { http://git-wip-us.apache.org/repos/asf/curator/blob/b79909a4/curator-x-rpc/src/main/thrift/curator.thrift ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/thrift/curator.thrift b/curator-x-rpc/src/main/thrift/curator.thrift index 1008b32..11e5edb 100644 --- a/curator-x-rpc/src/main/thrift/curator.thrift +++ b/curator-x-rpc/src/main/thrift/curator.thrift @@ -278,6 +278,9 @@ service DiscoveryService { } service DiscoveryServiceLowLevel { + DiscoveryInstance queryForInstance(1: CuratorProjection projection, 2: DiscoveryProjection discoveryProjection, 3: string name, 4: string id) throws (1: CuratorException ex1); + list<DiscoveryInstance> queryForInstances(1: CuratorProjection projection, 2: DiscoveryProjection discoveryProjection, 3: string name) throws (1: CuratorException ex1); + list<string> queryForNames(1: CuratorProjection projection, 2: DiscoveryProjection discoveryProjection) throws (1: CuratorException ex1); void registerInstance(1: CuratorProjection projection, 2: DiscoveryProjection discoveryProjection, 3: DiscoveryInstance instance) throws (1: CuratorException ex1); void unregisterInstance(1: CuratorProjection projection, 2: DiscoveryProjection discoveryProjection, 3: DiscoveryInstance instance) throws (1: CuratorException ex1); void updateInstance(1: CuratorProjection projection, 2: DiscoveryProjection discoveryProjection, 3: DiscoveryInstance instance) throws (1: CuratorException ex1);