Repository: curator Updated Branches: refs/heads/CURATOR-3.0 8e7fbac86 -> 836369284
Add 'enabled' flag to service instance Service providers will omit any instance that is marked 'enabled: false'. The low level API provided by ServiceDiscovery (e.g. queryForInstance, queryForInstances) will continue to return all instances, including those that are marked 'enabled: false'. This change allows instances to be temporarily removed from service discovery via an update to the instance node data. Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/4776a2da Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/4776a2da Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/4776a2da Branch: refs/heads/CURATOR-3.0 Commit: 4776a2daacb3a776e9dd0bdc214ed4987fef96c9 Parents: 31c0465 Author: Joe Littlejohn <joelittlej...@gmail.com> Authored: Mon Nov 2 22:12:43 2015 +0000 Committer: Joe Littlejohn <joelittlej...@gmail.com> Committed: Mon Nov 2 22:35:20 2015 +0000 ---------------------------------------------------------------------- .../curator/x/discovery/ServiceInstance.java | 17 ++- .../x/discovery/ServiceInstanceBuilder.java | 9 +- .../details/EnabledInstanceFilter.java | 33 ++++++ .../discovery/details/ServiceProviderImpl.java | 1 + .../x/discovery/TestJsonInstanceSerializer.java | 35 ++++-- .../discovery/details/TestServiceProvider.java | 109 +++++++++++++++++++ 6 files changed, 191 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/4776a2da/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceInstance.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceInstance.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceInstance.java index 939e708..820e92c 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceInstance.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceInstance.java @@ -42,6 +42,7 @@ public class ServiceInstance<T> private final long registrationTimeUTC; private final ServiceType serviceType; private final UriSpec uriSpec; + private final boolean enabled; /** * Return a new builder. The {@link #address} is set to the ip of the first @@ -75,7 +76,7 @@ public class ServiceInstance<T> * @param serviceType type of the service * @param uriSpec the uri spec or null */ - public ServiceInstance(String name, String id, String address, Integer port, Integer sslPort, T payload, long registrationTimeUTC, ServiceType serviceType, UriSpec uriSpec) + public ServiceInstance(String name, String id, String address, Integer port, Integer sslPort, T payload, long registrationTimeUTC, ServiceType serviceType, UriSpec uriSpec, boolean enabled) { name = Preconditions.checkNotNull(name, "name cannot be null"); id = Preconditions.checkNotNull(id, "id cannot be null"); @@ -89,6 +90,7 @@ public class ServiceInstance<T> this.sslPort = sslPort; this.payload = payload; this.registrationTimeUTC = registrationTimeUTC; + this.enabled = enabled; } /** @@ -96,7 +98,7 @@ public class ServiceInstance<T> */ ServiceInstance() { - this("", "", null, null, null, null, 0, ServiceType.DYNAMIC, null); + this("", "", null, null, null, null, 0, ServiceType.DYNAMIC, null, true); } public String getName() @@ -145,6 +147,11 @@ public class ServiceInstance<T> return uriSpec; } + public boolean isEnabled() + { + return enabled; + } + public String buildUriSpec() { return buildUriSpec(Maps.<String, Object>newHashMap()); @@ -206,6 +213,10 @@ public class ServiceInstance<T> { return false; } + if ( enabled != that.enabled ) + { + return false; + } return true; } @@ -222,6 +233,7 @@ public class ServiceInstance<T> result = 31 * result + (int)(registrationTimeUTC ^ (registrationTimeUTC >>> 32)); result = 31 * result + (serviceType != null ? serviceType.hashCode() : 0); result = 31 * result + (uriSpec != null ? uriSpec.hashCode() : 0); + result = 31 * result + (enabled ? 1231 : 1237); return result; } @@ -238,6 +250,7 @@ public class ServiceInstance<T> ", registrationTimeUTC=" + registrationTimeUTC + ", serviceType=" + serviceType + ", uriSpec=" + uriSpec + + ", enabled=" + enabled + '}'; } } http://git-wip-us.apache.org/repos/asf/curator/blob/4776a2da/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceInstanceBuilder.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceInstanceBuilder.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceInstanceBuilder.java index 4365cd6..c2ea40e 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceInstanceBuilder.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceInstanceBuilder.java @@ -41,6 +41,7 @@ public class ServiceInstanceBuilder<T> private long registrationTimeUTC; private ServiceType serviceType = ServiceType.DYNAMIC; private UriSpec uriSpec; + private boolean enabled = true; private static final AtomicReference<LocalIpFilter> localIpFilter = new AtomicReference<LocalIpFilter> ( @@ -85,7 +86,7 @@ public class ServiceInstanceBuilder<T> */ public ServiceInstance<T> build() { - return new ServiceInstance<T>(name, id, address, port, sslPort, payload, registrationTimeUTC, serviceType, uriSpec); + return new ServiceInstance<T>(name, id, address, port, sslPort, payload, registrationTimeUTC, serviceType, uriSpec, enabled); } public ServiceInstanceBuilder<T> name(String name) @@ -142,6 +143,12 @@ public class ServiceInstanceBuilder<T> return this; } + public ServiceInstanceBuilder<T> enabled(boolean enabled) + { + this.enabled = enabled; + return this; + } + /** * based on http://pastebin.com/5X073pUc * <p> http://git-wip-us.apache.org/repos/asf/curator/blob/4776a2da/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/EnabledInstanceFilter.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/EnabledInstanceFilter.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/EnabledInstanceFilter.java new file mode 100644 index 0000000..d153653 --- /dev/null +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/EnabledInstanceFilter.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.discovery.details; + +import org.apache.curator.x.discovery.InstanceFilter; +import org.apache.curator.x.discovery.ServiceInstance; + +public class EnabledInstanceFilter<T> implements InstanceFilter<T> +{ + + @Override + public boolean apply(ServiceInstance<T> instance) + { + return instance.isEnabled(); + } + +} http://git-wip-us.apache.org/repos/asf/curator/blob/4776a2da/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java index 5c63836..4cbd126 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java @@ -53,6 +53,7 @@ public class ServiceProviderImpl<T> implements ServiceProvider<T> ArrayList<InstanceFilter<T>> localFilters = Lists.newArrayList(filters); localFilters.add(downInstanceManager); + localFilters.add(new EnabledInstanceFilter<T>()); instanceProvider = new FilteredInstanceProvider<T>(cache, localFilters); } http://git-wip-us.apache.org/repos/asf/curator/blob/4776a2da/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestJsonInstanceSerializer.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestJsonInstanceSerializer.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestJsonInstanceSerializer.java index 8417710..f17919d 100644 --- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestJsonInstanceSerializer.java +++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestJsonInstanceSerializer.java @@ -32,8 +32,8 @@ public class TestJsonInstanceSerializer @Test public void testBasic() throws Exception { - JsonInstanceSerializer<String> serializer = new JsonInstanceSerializer<String>(String.class); - ServiceInstance<String> instance = new ServiceInstance<String>("name", "id", "address", 10, 20, "payload", 0, ServiceType.DYNAMIC, new UriSpec("{a}/b/{c}")); + JsonInstanceSerializer<String> serializer = new JsonInstanceSerializer<String>(String.class); + ServiceInstance<String> instance = new ServiceInstance<String>("name", "id", "address", 10, 20, "payload", 0, ServiceType.DYNAMIC, new UriSpec("{a}/b/{c}"), true); byte[] bytes = serializer.serialize(instance); ServiceInstance<String> rhs = serializer.deserialize(bytes); @@ -45,6 +45,7 @@ public class TestJsonInstanceSerializer Assert.assertEquals(instance.getPort(), rhs.getPort()); Assert.assertEquals(instance.getSslPort(), rhs.getSslPort()); Assert.assertEquals(instance.getUriSpec(), rhs.getUriSpec()); + Assert.assertEquals(instance.isEnabled(), rhs.isEnabled()); } @Test @@ -53,7 +54,7 @@ public class TestJsonInstanceSerializer JsonInstanceSerializer<String> stringSerializer = new JsonInstanceSerializer<String>(String.class); JsonInstanceSerializer<Double> doubleSerializer = new JsonInstanceSerializer<Double>(Double.class); - byte[] bytes = stringSerializer.serialize(new ServiceInstance<String>("name", "id", "address", 10, 20, "payload", 0, ServiceType.DYNAMIC, new UriSpec("{a}/b/{c}"))); + byte[] bytes = stringSerializer.serialize(new ServiceInstance<String>("name", "id", "address", 10, 20, "payload", 0, ServiceType.DYNAMIC, new UriSpec("{a}/b/{c}"), true)); try { doubleSerializer.deserialize(bytes); @@ -69,7 +70,7 @@ public class TestJsonInstanceSerializer public void testNoPayload() throws Exception { JsonInstanceSerializer<Void> serializer = new JsonInstanceSerializer<Void>(Void.class); - ServiceInstance<Void> instance = new ServiceInstance<Void>("name", "id", "address", 10, 20, null, 0, ServiceType.DYNAMIC, new UriSpec("{a}/b/{c}")); + ServiceInstance<Void> instance = new ServiceInstance<Void>("name", "id", "address", 10, 20, null, 0, ServiceType.DYNAMIC, new UriSpec("{a}/b/{c}"), true); byte[] bytes = serializer.serialize(instance); ServiceInstance<Void> rhs = serializer.deserialize(bytes); @@ -81,6 +82,17 @@ public class TestJsonInstanceSerializer Assert.assertEquals(instance.getPort(), rhs.getPort()); Assert.assertEquals(instance.getSslPort(), rhs.getSslPort()); Assert.assertEquals(instance.getUriSpec(), rhs.getUriSpec()); + Assert.assertEquals(instance.isEnabled(), rhs.isEnabled()); + } + + @Test + public void testNoEnabledState() throws Exception + { + JsonInstanceSerializer<Void> serializer = new JsonInstanceSerializer<Void>(Void.class); + byte[] bytes = "{}".getBytes("utf-8"); + + ServiceInstance<Void> instance = serializer.deserialize(bytes); + Assert.assertTrue(instance.isEnabled(), "Instance that has no 'enabled' should be assumed enabled"); } @Test @@ -90,8 +102,8 @@ public class TestJsonInstanceSerializer List<String> payload = new ArrayList<String>(); payload.add("Test value 1"); payload.add("Test value 2"); - ServiceInstance<Object> instance = new ServiceInstance<Object>("name", "id", "address", 10, 20, payload, 0, ServiceType.DYNAMIC, new UriSpec("{a}/b/{c}")); - byte[] bytes = serializer.serialize(instance); + ServiceInstance<Object> instance = new ServiceInstance<Object>("name", "id", "address", 10, 20, payload, 0, ServiceType.DYNAMIC, new UriSpec("{a}/b/{c}"), false); + byte[] bytes = serializer.serialize(instance); ServiceInstance<Object> rhs = serializer.deserialize(bytes); Assert.assertEquals(instance, rhs); @@ -102,6 +114,7 @@ public class TestJsonInstanceSerializer Assert.assertEquals(instance.getPort(), rhs.getPort()); Assert.assertEquals(instance.getSslPort(), rhs.getSslPort()); Assert.assertEquals(instance.getUriSpec(), rhs.getUriSpec()); + Assert.assertEquals(instance.isEnabled(), rhs.isEnabled()); } @@ -112,8 +125,8 @@ public class TestJsonInstanceSerializer Map<String,String> payload = new HashMap<String,String>(); payload.put("1", "Test value 1"); payload.put("2", "Test value 2"); - ServiceInstance<Object> instance = new ServiceInstance<Object>("name", "id", "address", 10, 20, payload, 0, ServiceType.DYNAMIC, new UriSpec("{a}/b/{c}")); - byte[] bytes = serializer.serialize(instance); + ServiceInstance<Object> instance = new ServiceInstance<Object>("name", "id", "address", 10, 20, payload, 0, ServiceType.DYNAMIC, new UriSpec("{a}/b/{c}"), false); + byte[] bytes = serializer.serialize(instance); ServiceInstance<Object> rhs = serializer.deserialize(bytes); Assert.assertEquals(instance, rhs); @@ -124,6 +137,7 @@ public class TestJsonInstanceSerializer Assert.assertEquals(instance.getPort(), rhs.getPort()); Assert.assertEquals(instance.getSslPort(), rhs.getSslPort()); Assert.assertEquals(instance.getUriSpec(), rhs.getUriSpec()); + Assert.assertEquals(instance.isEnabled(), rhs.isEnabled()); } @Test @@ -132,8 +146,8 @@ public class TestJsonInstanceSerializer JsonInstanceSerializer<Payload> serializer = new JsonInstanceSerializer<Payload>(Payload.class); Payload payload = new Payload(); payload.setVal("Test value"); - ServiceInstance<Payload> instance = new ServiceInstance<Payload>("name", "id", "address", 10, 20, payload, 0, ServiceType.DYNAMIC, new UriSpec("{a}/b/{c}")); - byte[] bytes = serializer.serialize(instance); + ServiceInstance<Payload> instance = new ServiceInstance<Payload>("name", "id", "address", 10, 20, payload, 0, ServiceType.DYNAMIC, new UriSpec("{a}/b/{c}"), true); + byte[] bytes = serializer.serialize(instance); ServiceInstance<Payload> rhs = serializer.deserialize(bytes); Assert.assertEquals(instance, rhs); @@ -144,6 +158,7 @@ public class TestJsonInstanceSerializer Assert.assertEquals(instance.getPort(), rhs.getPort()); Assert.assertEquals(instance.getSslPort(), rhs.getSslPort()); Assert.assertEquals(instance.getUriSpec(), rhs.getUriSpec()); + Assert.assertEquals(instance.isEnabled(), rhs.isEnabled()); } public static class Payload { http://git-wip-us.apache.org/repos/asf/curator/blob/4776a2da/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceProvider.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceProvider.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceProvider.java new file mode 100644 index 0000000..fae7266 --- /dev/null +++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceProvider.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.discovery.details; + +import java.io.Closeable; +import java.util.Collections; +import java.util.List; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.utils.CloseableUtils; +import org.apache.curator.x.discovery.ServiceDiscovery; +import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; +import org.apache.curator.x.discovery.ServiceInstance; +import org.apache.curator.x.discovery.ServiceProvider; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.collect.Lists; + +public class TestServiceProvider extends BaseClassForTests +{ + + @Test + public void testBasic() throws Exception + { + List<Closeable> closeables = Lists.newArrayList(); + try + { + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + closeables.add(client); + client.start(); + + ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build(); + ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build(); + closeables.add(discovery); + discovery.start(); + + ServiceProvider<String> provider = discovery.serviceProviderBuilder().serviceName("test").build(); + closeables.add(provider); + provider.start(); + + Assert.assertEquals(provider.getInstance(), instance); + + List<ServiceInstance<String>> list = Lists.newArrayList(); + list.add(instance); + Assert.assertEquals(provider.getAllInstances(), list); + } + finally + { + Collections.reverse(closeables); + for ( Closeable c : closeables ) + { + CloseableUtils.closeQuietly(c); + } + } + } + + @Test + public void testDisabledInstance() throws Exception + { + List<Closeable> closeables = Lists.newArrayList(); + try + { + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + closeables.add(client); + client.start(); + + ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).enabled(false).build(); + ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build(); + closeables.add(discovery); + discovery.start(); + + ServiceProvider<String> provider = discovery.serviceProviderBuilder().serviceName("test").build(); + closeables.add(provider); + provider.start(); + + Assert.assertEquals(provider.getInstance(), null); + Assert.assertTrue(provider.getAllInstances().isEmpty(), "Disabled instance still appears available via service provider"); + } + finally + { + Collections.reverse(closeables); + for ( Closeable c : closeables ) + { + CloseableUtils.closeQuietly(c); + } + } + } + +}