CAMEL-9683: Started on camel-ribbon
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/81f8397d Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/81f8397d Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/81f8397d Branch: refs/heads/remoteServiceCall Commit: 81f8397d2178b5fa913c65529786cf175f22de08 Parents: 9d44cfc Author: Claus Ibsen <davscl...@apache.org> Authored: Mon May 16 14:45:04 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon May 23 09:27:00 2016 +0200 ---------------------------------------------------------------------- .../ServiceCallConfigurationDefinition.java | 30 +++++++ .../processor/RibbonProcessorFactory.java | 25 ++++++ .../ribbon/processor/RibbonServer.java | 1 + .../processor/RibbonServiceCallProcessor.java | 35 ++++++-- ...bbonServiceCallStaticServerListStrategy.java | 17 ++++ .../RibbonServiceCallUpdateRouteTest.java | 90 ++++++++++++++++++++ .../SpringRibbonServiceCallRouteTest.xml | 18 +--- 7 files changed, 196 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/81f8397d/camel-core/src/main/java/org/apache/camel/model/ServiceCallConfigurationDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/ServiceCallConfigurationDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ServiceCallConfigurationDefinition.java index 9b38565..4be884d 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ServiceCallConfigurationDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ServiceCallConfigurationDefinition.java @@ -16,9 +16,12 @@ */ package org.apache.camel.model; +import java.util.ArrayList; +import java.util.List; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlAttribute; +import javax.xml.bind.annotation.XmlElement; import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlTransient; @@ -73,6 +76,8 @@ public class ServiceCallConfigurationDefinition extends IdentifiedType { private String serverListStrategyRef; @XmlTransient private ServiceCallServerListStrategy serverListStrategy; + @XmlElement(name = "clientProperty") @Metadata(label = "advanced") + private List<PropertyDefinition> properties; public ServiceCallConfigurationDefinition() { } @@ -245,6 +250,14 @@ public class ServiceCallConfigurationDefinition extends IdentifiedType { this.serverListStrategy = serverListStrategy; } + public List<PropertyDefinition> getProperties() { + return properties; + } + + public void setProperties(List<PropertyDefinition> properties) { + this.properties = properties; + } + // Fluent API // ------------------------------------------------------------------------- @@ -409,6 +422,23 @@ public class ServiceCallConfigurationDefinition extends IdentifiedType { } /** + * Adds a custom client property to use. + * <p/> + * These properties are specific to what service call implementation are in use. For example if using ribbon, then + * the client properties are define in com.netflix.client.config.CommonClientConfigKey. + */ + public ServiceCallConfigurationDefinition clientProperty(String key, String value) { + if (properties == null) { + properties = new ArrayList<>(); + } + PropertyDefinition prop = new PropertyDefinition(); + prop.setKey(key); + prop.setValue(value); + properties.add(prop); + return this; + } + + /** * End of configuration */ public ProcessorDefinition end() { http://git-wip-us.apache.org/repos/asf/camel/blob/81f8397d/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonProcessorFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonProcessorFactory.java b/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonProcessorFactory.java index 5a3f685..84b6bb5 100644 --- a/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonProcessorFactory.java +++ b/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonProcessorFactory.java @@ -25,6 +25,7 @@ import org.apache.camel.ExchangePattern; import org.apache.camel.Processor; import org.apache.camel.component.ribbon.RibbonConfiguration; import org.apache.camel.model.ProcessorDefinition; +import org.apache.camel.model.PropertyDefinition; import org.apache.camel.model.ServiceCallConfigurationDefinition; import org.apache.camel.model.ServiceCallDefinition; import org.apache.camel.spi.ProcessorFactory; @@ -124,15 +125,39 @@ public class RibbonProcessorFactory implements ProcessorFactory { throw new IllegalArgumentException("Load balancer must be of type: " + IRule.class + " but is of type: " + lb.getClass().getName()); } + Map<String, String> properties = configureProperties(routeContext, config, configRef); + RibbonServiceCallProcessor processor = new RibbonServiceCallProcessor(name, namespace, uri, mep, rc); processor.setRule((IRule) lb); processor.setServerListStrategy(sl); + processor.setRibbonClientConfig(properties); return processor; } else { return null; } } + private Map<String, String> configureProperties(RouteContext routeContext, ServiceCallConfigurationDefinition config, ServiceCallConfigurationDefinition configRef) throws Exception { + Map<String, String> answer = new HashMap<>(); + if (config != null && config.getProperties() != null) { + for (PropertyDefinition prop : config.getProperties()) { + // support property placeholders + String key = CamelContextHelper.parseText(routeContext.getCamelContext(), prop.getKey()); + String value = CamelContextHelper.parseText(routeContext.getCamelContext(), prop.getValue()); + answer.put(key, value); + } + } + if (configRef != null && configRef.getProperties() != null) { + for (PropertyDefinition prop : configRef.getProperties()) { + // support property placeholders + String key = CamelContextHelper.parseText(routeContext.getCamelContext(), prop.getKey()); + String value = CamelContextHelper.parseText(routeContext.getCamelContext(), prop.getValue()); + answer.put(key, value); + } + } + return answer; + } + private Object configureLoadBalancer(RouteContext routeContext, ServiceCallDefinition sd) { Object lb = null; http://git-wip-us.apache.org/repos/asf/camel/blob/81f8397d/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonServer.java ---------------------------------------------------------------------- diff --git a/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonServer.java b/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonServer.java index 8a545a8..d4e5bbf 100644 --- a/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonServer.java +++ b/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonServer.java @@ -29,4 +29,5 @@ public class RibbonServer extends Server implements ServiceCallServer { public String getIp() { return getHost(); } + } http://git-wip-us.apache.org/repos/asf/camel/blob/81f8397d/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonServiceCallProcessor.java ---------------------------------------------------------------------- diff --git a/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonServiceCallProcessor.java b/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonServiceCallProcessor.java index 5e77c51..290149a 100644 --- a/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonServiceCallProcessor.java +++ b/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonServiceCallProcessor.java @@ -16,13 +16,18 @@ */ package org.apache.camel.component.ribbon.processor; +import java.util.Map; import java.util.concurrent.RejectedExecutionException; +import com.netflix.client.config.IClientConfig; +import com.netflix.client.config.IClientConfigKey; +import com.netflix.loadbalancer.DummyPing; import com.netflix.loadbalancer.IRule; -import com.netflix.loadbalancer.LoadBalancerBuilder; +import com.netflix.loadbalancer.PollingServerListUpdater; import com.netflix.loadbalancer.RoundRobinRule; import com.netflix.loadbalancer.Server; import com.netflix.loadbalancer.ServerList; +import com.netflix.loadbalancer.ServerListUpdater; import com.netflix.loadbalancer.ZoneAwareLoadBalancer; import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; @@ -63,6 +68,7 @@ public class RibbonServiceCallProcessor extends ServiceSupport implements AsyncP private ZoneAwareLoadBalancer<RibbonServer> ribbonLoadBalancer; private IRule rule; private final RibbonServiceCallExpression serviceCallExpression; + private Map<String, String> ribbonClientConfig; private SendDynamicProcessor processor; public RibbonServiceCallProcessor(String name, String namespace, String uri, ExchangePattern exchangePattern, RibbonConfiguration configuration) { @@ -169,6 +175,14 @@ public class RibbonServiceCallProcessor extends ServiceSupport implements AsyncP this.rule = rule; } + public Map<String, String> getRibbonClientConfig() { + return ribbonClientConfig; + } + + public void setRibbonClientConfig(Map<String, String> ribbonClientConfig) { + this.ribbonClientConfig = ribbonClientConfig; + } + @Override @SuppressWarnings("unchecked") protected void doStart() throws Exception { @@ -187,12 +201,21 @@ public class RibbonServiceCallProcessor extends ServiceSupport implements AsyncP rule = new RoundRobinRule(); } - ribbonLoadBalancer = LoadBalancerBuilder.<RibbonServer>newBuilder() - .withDynamicServerList((ServerList<RibbonServer>) serverListStrategy) - .withRule(rule) - .buildDynamicServerListLoadBalancer(); + // setup client config + IClientConfig config = IClientConfig.Builder.newBuilder().build(); + if (ribbonClientConfig != null) { + for (Map.Entry<String, String> entry : ribbonClientConfig.entrySet()) { + IClientConfigKey key = IClientConfigKey.Keys.valueOf(entry.getKey()); + String value = entry.getValue(); + LOG.debug("RibbonClientConfig: {}={}", key.key(), value); + config.set(key, entry.getValue()); + } + } + + ServerListUpdater updater = new PollingServerListUpdater(config); + ribbonLoadBalancer = new ZoneAwareLoadBalancer<>(config, rule, new DummyPing(), (ServerList<RibbonServer>) serverListStrategy, null, updater); - LOG.info("RibbonServiceCall at namespace: {} with service name: {} is using load balancer: {} and service discovery: {}", namespace, name, ribbonLoadBalancer, serverListStrategy); + LOG.info("RibbonServiceCall at namespace: {} with service name: {} is using load balancer: {} and server list: {}", namespace, name, ribbonLoadBalancer, serverListStrategy); processor = new SendDynamicProcessor(uri, serviceCallExpression); processor.setCamelContext(getCamelContext()); http://git-wip-us.apache.org/repos/asf/camel/blob/81f8397d/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonServiceCallStaticServerListStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonServiceCallStaticServerListStrategy.java b/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonServiceCallStaticServerListStrategy.java index 5891360..a836900 100644 --- a/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonServiceCallStaticServerListStrategy.java +++ b/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonServiceCallStaticServerListStrategy.java @@ -23,12 +23,25 @@ import com.netflix.client.config.IClientConfig; import com.netflix.loadbalancer.AbstractServerList; import com.netflix.loadbalancer.ServerList; import org.apache.camel.spi.ServiceCallServerListStrategy; +import org.apache.camel.util.ObjectHelper; public class RibbonServiceCallStaticServerListStrategy extends AbstractServerList<RibbonServer> implements ServerList<RibbonServer>, ServiceCallServerListStrategy<RibbonServer> { private IClientConfig clientConfig; private final List<RibbonServer> servers = new ArrayList<>(); + public static RibbonServiceCallStaticServerListStrategy build(String servers) { + RibbonServiceCallStaticServerListStrategy answer = new RibbonServiceCallStaticServerListStrategy(); + String[] parts = servers.split(","); + for (String part : parts) { + String host = ObjectHelper.before(part, ":"); + String port = ObjectHelper.after(part, ":"); + int num = Integer.valueOf(port); + answer.addServer(host, num); + } + return answer; + } + public RibbonServiceCallStaticServerListStrategy() { } @@ -44,6 +57,10 @@ public class RibbonServiceCallStaticServerListStrategy extends AbstractServerLis servers.add(new RibbonServer(host, port)); } + public void removeServer(String host, int port) { + servers.remove(new RibbonServer(host, port)); + } + @Override public void initWithNiwsConfig(IClientConfig clientConfig) { this.clientConfig = clientConfig; http://git-wip-us.apache.org/repos/asf/camel/blob/81f8397d/components/camel-ribbon/src/test/java/org/apache/camel/component/ribbon/processor/RibbonServiceCallUpdateRouteTest.java ---------------------------------------------------------------------- diff --git a/components/camel-ribbon/src/test/java/org/apache/camel/component/ribbon/processor/RibbonServiceCallUpdateRouteTest.java b/components/camel-ribbon/src/test/java/org/apache/camel/component/ribbon/processor/RibbonServiceCallUpdateRouteTest.java new file mode 100644 index 0000000..2ff22ac --- /dev/null +++ b/components/camel-ribbon/src/test/java/org/apache/camel/component/ribbon/processor/RibbonServiceCallUpdateRouteTest.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ribbon.processor; + +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class RibbonServiceCallUpdateRouteTest extends CamelTestSupport { + + private final RibbonServiceCallStaticServerListStrategy servers = new RibbonServiceCallStaticServerListStrategy(); + + @Override + public void setUp() throws Exception { + // setup a static ribbon server list with these 2 servers to start with + servers.addServer("localhost", 9090); + servers.addServer("localhost", 9091); + + super.setUp(); + } + + @Test + public void testServiceCall() throws Exception { + getMockEndpoint("mock:9090").expectedMessageCount(1); + getMockEndpoint("mock:9091").expectedMessageCount(1); + getMockEndpoint("mock:result").expectedMessageCount(2); + + String out = template.requestBody("direct:start", null, String.class); + String out2 = template.requestBody("direct:start", null, String.class); + assertEquals("9091", out); + assertEquals("9090", out2); + + assertMockEndpointsSatisfied(); + + // stop the first server and remove it from the known list of servers + context.stopRoute("9090"); + servers.removeServer("localhost", 9090); + + // call the other active server + String out3 = template.requestBody("direct:start", null, String.class); + assertEquals("9091", out3); + + // sleep a bit to make the server updated run and detect that a server is no longer in the list + log.debug("Sleeping to all the server list updated to run"); + Thread.sleep(1000); + log.debug("Calling the service now"); + + // call again and it should call 9091 as its the only active server + String out4 = template.requestBody("direct:start", null, String.class); + assertEquals("9091", out4); + } + + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .serviceCall().name("myService") + // lets update quick so we do not have to sleep so much in the tests + .serviceCallConfiguration().serverListStrategy(servers).clientProperty("ServerListRefreshInterval", "250").end() + .to("mock:result"); + + from("jetty:http://localhost:9090").routeId("9090") + .to("mock:9090") + .transform().constant("9090"); + + from("jetty:http://localhost:9091").routeId("9091") + .to("mock:9091") + .transform().constant("9091"); + } + }; + } +} + http://git-wip-us.apache.org/repos/asf/camel/blob/81f8397d/components/camel-ribbon/src/test/resources/org/apache/camel/component/ribbon/processor/SpringRibbonServiceCallRouteTest.xml ---------------------------------------------------------------------- diff --git a/components/camel-ribbon/src/test/resources/org/apache/camel/component/ribbon/processor/SpringRibbonServiceCallRouteTest.xml b/components/camel-ribbon/src/test/resources/org/apache/camel/component/ribbon/processor/SpringRibbonServiceCallRouteTest.xml index a439f23..487fa0c 100644 --- a/components/camel-ribbon/src/test/resources/org/apache/camel/component/ribbon/processor/SpringRibbonServiceCallRouteTest.xml +++ b/components/camel-ribbon/src/test/resources/org/apache/camel/component/ribbon/processor/SpringRibbonServiceCallRouteTest.xml @@ -22,21 +22,11 @@ http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd "> - <!-- TODO: should be nicer to do in XML--> <!-- setup a static ribbon server list with these 2 servers to start with --> - <bean id="servers" class="org.apache.camel.component.ribbon.processor.RibbonServiceCallStaticServerListStrategy"> - <constructor-arg index="0"> - <list> - <bean class="org.apache.camel.component.ribbon.processor.RibbonServer"> - <constructor-arg index="0" value="localhost"/> - <constructor-arg index="1" value="9090"/> - </bean> - <bean class="org.apache.camel.component.ribbon.processor.RibbonServer"> - <constructor-arg index="0" value="localhost"/> - <constructor-arg index="1" value="9091"/> - </bean> - </list> - </constructor-arg> + <!-- call the build method with the argument below --> + <bean id="servers" class="org.apache.camel.component.ribbon.processor.RibbonServiceCallStaticServerListStrategy" + factory-method="build"> + <constructor-arg index="0" value="localhost:9090,localhost:9091"/> </bean> <camelContext xmlns="http://camel.apache.org/schema/spring">