CAMEL-9683: A new toService EIP that uses a client discovery to lookup alive services and pick a service ip/port to use when calling the service from Camel route. Allows to plugin different providers.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/11589cbb Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/11589cbb Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/11589cbb Branch: refs/heads/remoteServiceCall Commit: 11589cbb8b1ceff398a1aedd96ca458749534a33 Parents: 1b7d303 Author: Claus Ibsen <davscl...@apache.org> Authored: Tue Apr 26 10:45:46 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon May 23 09:23:19 2016 +0200 ---------------------------------------------------------------------- components/camel-kubernetes/pom.xml | 5 ++ .../kubernetes/KubernetesConstants.java | 5 ++ .../KubernetesServiceCallProcessor.java | 63 ++++++++++++-- .../processor/ServiceCallExpression.java | 92 ++++++++++++++++++++ .../processor/ServiceCallRouteTest.java | 3 +- 5 files changed, 157 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/11589cbb/components/camel-kubernetes/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/pom.xml b/components/camel-kubernetes/pom.xml index 1d39ba4..699f9cd 100644 --- a/components/camel-kubernetes/pom.xml +++ b/components/camel-kubernetes/pom.xml @@ -50,6 +50,11 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-http</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> <version>${commons-codec-version}</version> http://git-wip-us.apache.org/repos/asf/camel/blob/11589cbb/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConstants.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConstants.java index 3d78497..29a3fa9 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConstants.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConstants.java @@ -17,6 +17,11 @@ package org.apache.camel.component.kubernetes; public interface KubernetesConstants { + + // Service Call EIP + String KUBERNETES_SERVER_IP = "CamelKubernetesServerIp"; + String KUBERNETES_SERVER_PORT = "CamelKubernetesServerPort"; + // Producer String KUBERNETES_OPERATION = "CamelKubernetesOperation"; String KUBERNETES_NAMESPACE_NAME = "CamelKubernetesNamespaceName"; http://git-wip-us.apache.org/repos/asf/camel/blob/11589cbb/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallProcessor.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallProcessor.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallProcessor.java index 48e4c7c..1583180 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallProcessor.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallProcessor.java @@ -25,10 +25,14 @@ import io.fabric8.openshift.client.DefaultOpenShiftClient; import io.fabric8.openshift.client.OpenShiftClient; import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.Traceable; import org.apache.camel.component.kubernetes.KubernetesConfiguration; +import org.apache.camel.component.kubernetes.KubernetesConstants; +import org.apache.camel.processor.SendDynamicProcessor; import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; @@ -40,28 +44,53 @@ import org.slf4j.LoggerFactory; /** * Kubernetes based implementation of the the ServiceCall EIP. */ -public class KubernetesServiceCallProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware { +public class KubernetesServiceCallProcessor extends ServiceSupport implements AsyncProcessor, CamelContextAware, Traceable, IdAware { private static final Logger LOG = LoggerFactory.getLogger(KubernetesServiceCallProcessor.class); + private CamelContext camelContext; private String id; private final String name; + private final String scheme; + private final String contextPath; private final String namespace; private final String uri; private final ExchangePattern exchangePattern; private final KubernetesConfiguration configuration; - private KubernetesServiceDiscovery discovery; + private ServiceCallLoadBalancer loadBalancer = new RandomLoadBalancer(); + private final ServiceCallExpression serviceCallExpression; + private SendDynamicProcessor processor; // TODO: allow to plugin custom load balancer like ribbon public KubernetesServiceCallProcessor(String name, String namespace, String uri, ExchangePattern exchangePattern, KubernetesConfiguration configuration) { - this.name = name; + // setup from the provided name which can contain scheme and context-path information as well + String serviceName; + if (name.contains("/")) { + serviceName = ObjectHelper.before(name, "/"); + this.contextPath = ObjectHelper.after(name, "/"); + } else if (name.contains("?")) { + serviceName = ObjectHelper.before(name, "?"); + this.contextPath = ObjectHelper.after(name, "?"); + } else { + serviceName = name; + this.contextPath = null; + } + if (serviceName.contains(":")) { + this.scheme = ObjectHelper.before(serviceName, ":"); + this.name = ObjectHelper.after(serviceName, ":"); + } else { + this.scheme = null; + this.name = serviceName; + } + this.namespace = namespace; this.uri = uri; this.exchangePattern = exchangePattern; this.configuration = configuration; + this.serviceCallExpression = new ServiceCallExpression(this.name, this.scheme, this.contextPath, this.uri); } @Override @@ -92,11 +121,22 @@ public class KubernetesServiceCallProcessor extends ServiceSupport implements As int port = server.getPort(); LOG.debug("Random selected service {} active at server: {}:{}", name, ip, port); - // build uri based on the name + // set selected server as header + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_SERVER_IP, ip); + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_SERVER_PORT, port); - // TODO build uri - callback.done(true); - return true; + // use the dynamic send processor to call the service + return processor.process(exchange, callback); + } + + @Override + public CamelContext getCamelContext() { + return camelContext; + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; } @Override @@ -121,12 +161,17 @@ public class KubernetesServiceCallProcessor extends ServiceSupport implements As ObjectHelper.notEmpty(configuration.getMasterUrl(), "masterUrl", this); discovery = new KubernetesServiceDiscovery(name, namespace, null, createKubernetesClient()); - ServiceHelper.startService(discovery); + processor = new SendDynamicProcessor(uri, serviceCallExpression); + processor.setCamelContext(getCamelContext()); + if (exchangePattern != null) { + processor.setPattern(exchangePattern); + } + ServiceHelper.startServices(discovery, processor); } @Override protected void doStop() throws Exception { - ServiceHelper.stopService(discovery); + ServiceHelper.stopServices(processor, discovery); } private OpenShiftClient createKubernetesClient() { http://git-wip-us.apache.org/repos/asf/camel/blob/11589cbb/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/ServiceCallExpression.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/ServiceCallExpression.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/ServiceCallExpression.java new file mode 100644 index 0000000..2cb1bf1 --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/ServiceCallExpression.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.processor; + +import org.apache.camel.Exchange; +import org.apache.camel.component.kubernetes.KubernetesConstants; +import org.apache.camel.support.ExpressionAdapter; +import org.apache.camel.util.ExchangeHelper; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ServiceCallExpression extends ExpressionAdapter { + + private static final Logger LOG = LoggerFactory.getLogger(ServiceCallExpression.class); + + private final String name; + private final String scheme; + private final String contextPath; + private final String uri; + + public ServiceCallExpression(String name, String scheme, String contextPath, String uri) { + this.name = name; + this.scheme = scheme; + this.contextPath = contextPath; + this.uri = uri; + } + + @Override + public Object evaluate(Exchange exchange) { + try { + String ip = ExchangeHelper.getMandatoryHeader(exchange, KubernetesConstants.KUBERNETES_SERVER_IP, String.class); + int port = ExchangeHelper.getMandatoryHeader(exchange, KubernetesConstants.KUBERNETES_SERVER_PORT, int.class); + return buildCamelEndpointUri(ip, port, name, uri, contextPath, scheme); + } catch (Exception e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } + } + + protected static String buildCamelEndpointUri(String ip, int port, String name, String uri, String contextPath, String scheme) { + // serviceCall("myService") (will use http by default) + // serviceCall("myService/foo") (will use http by default) + // serviceCall("http:myService/foo") + // serviceCall("myService", "http:myService.host:myService.port/foo") + // serviceCall("myService", "netty4:tcp:myService?connectTimeout=1000") + + // build basic uri if none provided + String answer = uri; + if (answer == null) { + if (scheme == null) { + if (port == 443) { + scheme = "https"; + } else { + scheme = "http"; + } + } + answer = scheme + "://" + ip + ":" + port; + if (contextPath != null) { + answer += "" + contextPath; + } + } else { + // we have existing uri, then replace the serviceName with ip:port + if (answer.contains(name + ".host")) { + answer = answer.replaceFirst(name + "\\.host", ip); + } + if (answer.contains(name + ".port")) { + answer = answer.replaceFirst(name + "\\.port", "" + port); + } + if (answer.contains(name)) { + answer = answer.replaceFirst(name, ip + ":" + port); + } + } + + LOG.debug("Camel endpoint uri: {} for calling service: {} + on server {}:{}", answer, name, ip, port); + return answer; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/11589cbb/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/processor/ServiceCallRouteTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/processor/ServiceCallRouteTest.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/processor/ServiceCallRouteTest.java index 6595b48..6164d0c 100644 --- a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/processor/ServiceCallRouteTest.java +++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/processor/ServiceCallRouteTest.java @@ -42,11 +42,10 @@ public class ServiceCallRouteTest extends CamelTestSupport { config.setMasterUrl("https://fabric8-master.vagrant.f8:8443"); config.setUsername("admin"); config.setPassword("admin"); -// config.setOauthToken("eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJkZWZhdWx0Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9zZWNyZXQubmFtZSI6ImZhYnJpYzgtdG9rZW4tZzNsdGoiLCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlcnZpY2UtYWNjb3VudC5uYW1lIjoiZmFicmljOCIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VydmljZS1hY2NvdW50LnVpZCI6ImU0NGJhYzA0LWZmYjQtMTFlNS05MWM0LTA4MDAyN2I1YzJmNCIsInN1YiI6InN5c3RlbTpzZXJ2aWNlYWNjb3VudDpkZWZhdWx0OmZhYnJpYzgifQ.yqhevtuqliAV7RlRhaSjG8oFSOn2V1vfmj5V9JKpaOCWbWXMYS0y_v4QPfI4vIGsJtpZgasrt-8brkiOkq7zx0BJxVm-Ae5QIE1uJNeWFYcno823SUV2ebHykhp0eUEtCmWtHByBIoTTF8dG3NZ6jWow7KVGN289Y2ryi8QoYupfQ9ABddVVcduolStIqBu3pu-dJqIvlt6L8wE6AHfhS4uSaPwcimbs5hrg6gB_iONCSCSayhOyiT6fNlXdpxndRRBg9MP3X3f4dD3kDyHE0860HzqZ05jFIwGfV_rbFJeNY3SLDQNO_QFXqUZKg01OH-OJaqDSjuV48P9b6n4uHA"); config.setNamespace("default"); from("direct:start") - .serviceCall("cdi-camel-jetty", "http:cdi-camel-jetty", config) + .serviceCall("cdi-camel-jetty", null, config) .to("mock:result"); } };