CAMEL-9997: ServiceCall : ServiceCallServerListStrategy to support dynamic service name
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7eae925f Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7eae925f Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7eae925f Branch: refs/heads/master Commit: 7eae925f31d3dd1f9823b490c4eeadc7722fbced Parents: ce00795 Author: lburgazzoli <lburgazz...@gmail.com> Authored: Mon May 30 10:58:08 2016 +0200 Committer: lburgazzoli <lburgazz...@gmail.com> Committed: Tue Jun 7 16:30:43 2016 +0200 ---------------------------------------------------------------------- .../remote/DefaultServiceCallExpression.java | 5 +--- .../remote/DefaultServiceCallProcessor.java | 8 +++---- .../camel/impl/remote/ServiceCallConstants.java | 24 ++++++++++++++++++++ .../spi/ServiceCallServerListStrategy.java | 8 +++++-- .../ConsulServiceCallServerListStrategies.java | 16 ++++++------- .../ConsulServiceCallServerListStrategy.java | 10 ++------ .../service/ServiceCallClientRouteTest.java | 3 +-- .../KubernetesClientServiceCallProcessor.java | 6 +++-- ...KubernetesServiceCallServerListStrategy.java | 8 +++---- ...bbonServiceCallStaticServerListStrategy.java | 11 +++++++++ 10 files changed, 63 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/7eae925f/camel-core/src/main/java/org/apache/camel/impl/remote/DefaultServiceCallExpression.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/remote/DefaultServiceCallExpression.java b/camel-core/src/main/java/org/apache/camel/impl/remote/DefaultServiceCallExpression.java index 5ce2cca..8d790b2 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/remote/DefaultServiceCallExpression.java +++ b/camel-core/src/main/java/org/apache/camel/impl/remote/DefaultServiceCallExpression.java @@ -22,14 +22,11 @@ import org.apache.camel.support.ServiceCallExpressionSupport; import org.apache.camel.util.ExchangeHelper; public class DefaultServiceCallExpression extends ServiceCallExpressionSupport { - public static final String SERVER_IP = "CamelServiceCallServerIp"; - public static final String SERVER_PORT = "CamelServiceCallServerPort"; - private final String ipHeader; private final String portHeader; public DefaultServiceCallExpression(String name, String scheme, String contextPath, String uri) { - this(name, scheme, contextPath, uri, SERVER_IP, SERVER_PORT); + this(name, scheme, contextPath, uri, ServiceCallConstants.SERVER_IP, ServiceCallConstants.SERVER_PORT); } public DefaultServiceCallExpression(String name, String scheme, String contextPath, String uri, String ipHeader, String portHeader) { http://git-wip-us.apache.org/repos/asf/camel/blob/7eae925f/camel-core/src/main/java/org/apache/camel/impl/remote/DefaultServiceCallProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/remote/DefaultServiceCallProcessor.java b/camel-core/src/main/java/org/apache/camel/impl/remote/DefaultServiceCallProcessor.java index 4cd40d2..069bd18 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/remote/DefaultServiceCallProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/impl/remote/DefaultServiceCallProcessor.java @@ -138,7 +138,6 @@ public class DefaultServiceCallProcessor extends ServiceSupport implements Async @Override protected void doStart() throws Exception { - ObjectHelper.notEmpty(name, "name", this); ObjectHelper.notNull(camelContext, "camelContext"); ObjectHelper.notNull(serverListStrategy, "serverListStrategy"); ObjectHelper.notNull(loadBalancer, "loadBalancer"); @@ -169,8 +168,9 @@ public class DefaultServiceCallProcessor extends ServiceSupport implements Async @Override public boolean process(Exchange exchange, AsyncCallback callback) { Collection<ServiceCallServer> servers = null; + String serviceName = exchange.getIn().getHeader(ServiceCallConstants.SERVICE_NAME, name, String.class); try { - servers = serverListStrategy.getUpdatedListOfServers(); + servers = serverListStrategy.getUpdatedListOfServers(serviceName); if (servers == null || servers.isEmpty()) { exchange.setException(new RejectedExecutionException("No active services with name " + name)); } @@ -190,8 +190,8 @@ public class DefaultServiceCallProcessor extends ServiceSupport implements Async LOG.debug("Service {} active at server: {}:{}", name, ip, port); // set selected server as header - exchange.getIn().setHeader(DefaultServiceCallExpression.SERVER_IP, ip); - exchange.getIn().setHeader(DefaultServiceCallExpression.SERVER_PORT, port); + exchange.getIn().setHeader(ServiceCallConstants.SERVER_IP, ip); + exchange.getIn().setHeader(ServiceCallConstants.SERVER_PORT, port); // use the dynamic send processor to call the service return processor.process(exchange, callback); http://git-wip-us.apache.org/repos/asf/camel/blob/7eae925f/camel-core/src/main/java/org/apache/camel/impl/remote/ServiceCallConstants.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/remote/ServiceCallConstants.java b/camel-core/src/main/java/org/apache/camel/impl/remote/ServiceCallConstants.java new file mode 100644 index 0000000..806b854 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/impl/remote/ServiceCallConstants.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl.remote; + + +public interface ServiceCallConstants { + String SERVICE_NAME = "CamelServiceCallServiceNa,e"; + String SERVER_IP = "CamelServiceCallServerIp"; + String SERVER_PORT = "CamelServiceCallServerPort"; +} http://git-wip-us.apache.org/repos/asf/camel/blob/7eae925f/camel-core/src/main/java/org/apache/camel/spi/ServiceCallServerListStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/ServiceCallServerListStrategy.java b/camel-core/src/main/java/org/apache/camel/spi/ServiceCallServerListStrategy.java index b7edee9..64fd804 100644 --- a/camel-core/src/main/java/org/apache/camel/spi/ServiceCallServerListStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/spi/ServiceCallServerListStrategy.java @@ -30,8 +30,10 @@ public interface ServiceCallServerListStrategy<T extends ServiceCallServer> { * Gets the initial list of servers. * <p/> * This method may return <tt>null</tt> or an empty list. + * + * @param name the service name */ - Collection<T> getInitialListOfServers(); + Collection<T> getInitialListOfServers(String name); /** * Gets the updated list of servers. @@ -39,7 +41,9 @@ public interface ServiceCallServerListStrategy<T extends ServiceCallServer> { * This method can either be called on-demand prior to a service call, or have * a background job that is scheduled to update the list, or a watcher * that triggers when the list of servers changes. + * + * @param name the service name */ - Collection<T> getUpdatedListOfServers(); + Collection<T> getUpdatedListOfServers(String name); } http://git-wip-us.apache.org/repos/asf/camel/blob/7eae925f/components/camel-consul/src/main/java/org/apache/camel/component/consul/processor/service/ConsulServiceCallServerListStrategies.java ---------------------------------------------------------------------- diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/processor/service/ConsulServiceCallServerListStrategies.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/processor/service/ConsulServiceCallServerListStrategies.java index c66dcdb..8449bfa 100644 --- a/components/camel-consul/src/main/java/org/apache/camel/component/consul/processor/service/ConsulServiceCallServerListStrategies.java +++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/processor/service/ConsulServiceCallServerListStrategies.java @@ -28,24 +28,22 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public final class ConsulServiceCallServerListStrategies { - private static final Logger LOGGER = LoggerFactory.getLogger(ConsulServiceCallServerListStrategies.class); - private ConsulServiceCallServerListStrategies() { } public static final class OnDemand extends ConsulServiceCallServerListStrategy { - public OnDemand(ConsulConfiguration configuration, String name) throws Exception { - super(configuration, name); + public OnDemand(ConsulConfiguration configuration) throws Exception { + super(configuration); } @Override - public Collection<ServiceCallServer> getUpdatedListOfServers() { + public Collection<ServiceCallServer> getUpdatedListOfServers(String name) { List<CatalogService> services = getCatalogClient() - .getService(getName(), getCatalogOptions()) + .getService(name, getCatalogOptions()) .getResponse(); List<ServiceHealth> healths = getHealthClient() - .getAllServiceInstances(getName(), getCatalogOptions()) + .getAllServiceInstances(name, getCatalogOptions()) .getResponse(); return services.stream() @@ -64,7 +62,7 @@ public final class ConsulServiceCallServerListStrategies { // Helpers // ************************************************************************* - public static ConsulServiceCallServerListStrategy onDemand(ConsulConfiguration configuration, String name) throws Exception { - return new OnDemand(configuration, name); + public static ConsulServiceCallServerListStrategy onDemand(ConsulConfiguration configuration) throws Exception { + return new OnDemand(configuration); } } http://git-wip-us.apache.org/repos/asf/camel/blob/7eae925f/components/camel-consul/src/main/java/org/apache/camel/component/consul/processor/service/ConsulServiceCallServerListStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/processor/service/ConsulServiceCallServerListStrategy.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/processor/service/ConsulServiceCallServerListStrategy.java index ebe5ddf..886e929 100644 --- a/components/camel-consul/src/main/java/org/apache/camel/component/consul/processor/service/ConsulServiceCallServerListStrategy.java +++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/processor/service/ConsulServiceCallServerListStrategy.java @@ -38,12 +38,10 @@ import org.apache.camel.util.ObjectHelper; abstract class ConsulServiceCallServerListStrategy extends ServiceSupport implements ServiceCallServerListStrategy<ServiceCallServer> { private final Consul client; - private final String name; private final CatalogOptions catalogOptions; - ConsulServiceCallServerListStrategy(ConsulConfiguration configuration, String name) throws Exception { + ConsulServiceCallServerListStrategy(ConsulConfiguration configuration) throws Exception { this.client = configuration.createConsulClient(); - this.name = name; ImmutableCatalogOptions.Builder builder = ImmutableCatalogOptions.builder(); if (ObjectHelper.isNotEmpty(configuration.getDc())) { @@ -57,7 +55,7 @@ abstract class ConsulServiceCallServerListStrategy extends ServiceSupport implem } @Override - public Collection<ServiceCallServer> getInitialListOfServers() { + public Collection<ServiceCallServer> getInitialListOfServers(String name) { return Collections.emptyList(); } @@ -92,10 +90,6 @@ abstract class ConsulServiceCallServerListStrategy extends ServiceSupport implem return client.healthClient(); } - protected String getName() { - return name; - } - protected CatalogOptions getCatalogOptions() { return catalogOptions; } http://git-wip-us.apache.org/repos/asf/camel/blob/7eae925f/components/camel-consul/src/test/java/org/apache/camel/component/consul/processor/service/ServiceCallClientRouteTest.java ---------------------------------------------------------------------- diff --git a/components/camel-consul/src/test/java/org/apache/camel/component/consul/processor/service/ServiceCallClientRouteTest.java b/components/camel-consul/src/test/java/org/apache/camel/component/consul/processor/service/ServiceCallClientRouteTest.java index e3f753d..eb18a9e 100644 --- a/components/camel-consul/src/test/java/org/apache/camel/component/consul/processor/service/ServiceCallClientRouteTest.java +++ b/components/camel-consul/src/test/java/org/apache/camel/component/consul/processor/service/ServiceCallClientRouteTest.java @@ -102,8 +102,7 @@ public class ServiceCallClientRouteTest extends ConsulTestSupport { config.setComponent("http"); config.setLoadBalancer(new RoundRobinServiceCallLoadBalancer()); config.setServerListStrategy(ConsulServiceCallServerListStrategies.onDemand( - new ConsulConfiguration(context()), - SERVICE_NAME + new ConsulConfiguration(context()) )); // register configuration http://git-wip-us.apache.org/repos/asf/camel/blob/7eae925f/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesClientServiceCallProcessor.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesClientServiceCallProcessor.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesClientServiceCallProcessor.java index f1af317..0ce579a 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesClientServiceCallProcessor.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesClientServiceCallProcessor.java @@ -31,6 +31,7 @@ import org.apache.camel.ExchangePattern; import org.apache.camel.Traceable; import org.apache.camel.component.kubernetes.KubernetesConfiguration; import org.apache.camel.component.kubernetes.KubernetesConstants; +import org.apache.camel.impl.remote.ServiceCallConstants; import org.apache.camel.processor.SendDynamicProcessor; import org.apache.camel.spi.IdAware; import org.apache.camel.spi.ServiceCallLoadBalancer; @@ -104,8 +105,9 @@ public class KubernetesClientServiceCallProcessor extends ServiceSupport impleme @Override public boolean process(Exchange exchange, AsyncCallback callback) { Collection<KubernetesServer> servers = null; + String serviceName = exchange.getIn().getHeader(ServiceCallConstants.SERVICE_NAME, name, String.class); try { - servers = serverListStrategy.getUpdatedListOfServers(); + servers = serverListStrategy.getUpdatedListOfServers(serviceName); if (servers == null || servers.isEmpty()) { exchange.setException(new RejectedExecutionException("No active services with name " + name + " in namespace " + namespace)); } @@ -183,7 +185,7 @@ public class KubernetesClientServiceCallProcessor extends ServiceSupport impleme loadBalancer = new RandomLoadBalancer(); } if (serverListStrategy == null) { - serverListStrategy = new KubernetesServiceCallServerListStrategy(name, namespace, null, createKubernetesClient()); + serverListStrategy = new KubernetesServiceCallServerListStrategy(namespace, null, createKubernetesClient()); } LOG.info("KubernetesServiceCall at namespace: {} with service name: {} is using load balancer: {} and service discovery: {}", namespace, name, loadBalancer, serverListStrategy); http://git-wip-us.apache.org/repos/asf/camel/blob/7eae925f/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallServerListStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallServerListStrategy.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallServerListStrategy.java index f35cfed..218a933 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallServerListStrategy.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallServerListStrategy.java @@ -41,13 +41,11 @@ public class KubernetesServiceCallServerListStrategy extends ServiceSupport impl private static final Logger LOG = LoggerFactory.getLogger(KubernetesServiceCallServerListStrategy.class); private static final int FIRST = 0; - private String name; private String namespace; private String portName; private AutoAdaptableKubernetesClient client; - public KubernetesServiceCallServerListStrategy(String name, String namespace, String portName, AutoAdaptableKubernetesClient client) { - this.name = name; + public KubernetesServiceCallServerListStrategy(String namespace, String portName, AutoAdaptableKubernetesClient client) { this.namespace = namespace; this.portName = portName; this.client = client; @@ -55,11 +53,11 @@ public class KubernetesServiceCallServerListStrategy extends ServiceSupport impl @Override @SuppressWarnings("unchecked") - public Collection<KubernetesServer> getInitialListOfServers() { + public Collection<KubernetesServer> getInitialListOfServers(String name) { return Collections.EMPTY_LIST; } - public Collection<KubernetesServer> getUpdatedListOfServers() { + public Collection<KubernetesServer> getUpdatedListOfServers(String name) { LOG.debug("Discovering endpoints from namespace: {} with name: {}", namespace, name); Endpoints endpoints = client.endpoints().inNamespace(namespace).withName(name).get(); List<KubernetesServer> result = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/camel/blob/7eae925f/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonServiceCallStaticServerListStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonServiceCallStaticServerListStrategy.java b/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonServiceCallStaticServerListStrategy.java index 0e3ced4..1cb0de3 100644 --- a/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonServiceCallStaticServerListStrategy.java +++ b/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonServiceCallStaticServerListStrategy.java @@ -17,6 +17,7 @@ package org.apache.camel.component.ribbon.processor; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import com.netflix.client.config.IClientConfig; @@ -95,4 +96,14 @@ public class RibbonServiceCallStaticServerListStrategy extends AbstractServerLis public List<RibbonServer> getUpdatedListOfServers() { return servers; } + + @Override + public Collection<RibbonServer> getInitialListOfServers(String name) { + return getInitialListOfServers(); + } + + @Override + public Collection<RibbonServer> getUpdatedListOfServers(String name) { + return getUpdatedListOfServers(); + } }