CAMEL-9683: A new toService EIP that uses a client discovery to lookup alive 
services and pick a service ip/port to use when calling the service from Camel 
route. Allows to plugin different providers.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/11589cbb
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/11589cbb
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/11589cbb

Branch: refs/heads/remoteServiceCall
Commit: 11589cbb8b1ceff398a1aedd96ca458749534a33
Parents: 1b7d303
Author: Claus Ibsen <davscl...@apache.org>
Authored: Tue Apr 26 10:45:46 2016 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Mon May 23 09:23:19 2016 +0200

----------------------------------------------------------------------
 components/camel-kubernetes/pom.xml             |  5 ++
 .../kubernetes/KubernetesConstants.java         |  5 ++
 .../KubernetesServiceCallProcessor.java         | 63 ++++++++++++--
 .../processor/ServiceCallExpression.java        | 92 ++++++++++++++++++++
 .../processor/ServiceCallRouteTest.java         |  3 +-
 5 files changed, 157 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/11589cbb/components/camel-kubernetes/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/pom.xml 
b/components/camel-kubernetes/pom.xml
index 1d39ba4..699f9cd 100644
--- a/components/camel-kubernetes/pom.xml
+++ b/components/camel-kubernetes/pom.xml
@@ -50,6 +50,11 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-http</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>commons-codec</groupId>
       <artifactId>commons-codec</artifactId>
       <version>${commons-codec-version}</version>

http://git-wip-us.apache.org/repos/asf/camel/blob/11589cbb/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConstants.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConstants.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConstants.java
index 3d78497..29a3fa9 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConstants.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConstants.java
@@ -17,6 +17,11 @@
 package org.apache.camel.component.kubernetes;
 
 public interface KubernetesConstants {
+
+    // Service Call EIP
+    String KUBERNETES_SERVER_IP = "CamelKubernetesServerIp";
+    String KUBERNETES_SERVER_PORT = "CamelKubernetesServerPort";
+
     // Producer
     String KUBERNETES_OPERATION = "CamelKubernetesOperation";
     String KUBERNETES_NAMESPACE_NAME = "CamelKubernetesNamespaceName";

http://git-wip-us.apache.org/repos/asf/camel/blob/11589cbb/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallProcessor.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallProcessor.java
index 48e4c7c..1583180 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallProcessor.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallProcessor.java
@@ -25,10 +25,14 @@ import io.fabric8.openshift.client.DefaultOpenShiftClient;
 import io.fabric8.openshift.client.OpenShiftClient;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Traceable;
 import org.apache.camel.component.kubernetes.KubernetesConfiguration;
+import org.apache.camel.component.kubernetes.KubernetesConstants;
+import org.apache.camel.processor.SendDynamicProcessor;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorHelper;
@@ -40,28 +44,53 @@ import org.slf4j.LoggerFactory;
 /**
  * Kubernetes based implementation of the the ServiceCall EIP.
  */
-public class KubernetesServiceCallProcessor extends ServiceSupport implements 
AsyncProcessor, Traceable, IdAware {
+public class KubernetesServiceCallProcessor extends ServiceSupport implements 
AsyncProcessor, CamelContextAware, Traceable, IdAware {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesServiceCallProcessor.class);
 
+    private CamelContext camelContext;
     private String id;
     private final String name;
+    private final String scheme;
+    private final String contextPath;
     private final String namespace;
     private final String uri;
     private final ExchangePattern exchangePattern;
     private final KubernetesConfiguration configuration;
-
     private KubernetesServiceDiscovery discovery;
+
     private ServiceCallLoadBalancer loadBalancer = new RandomLoadBalancer();
+    private final ServiceCallExpression serviceCallExpression;
+    private SendDynamicProcessor processor;
 
     // TODO: allow to plugin custom load balancer like ribbon
 
     public KubernetesServiceCallProcessor(String name, String namespace, 
String uri, ExchangePattern exchangePattern, KubernetesConfiguration 
configuration) {
-        this.name = name;
+        // setup from the provided name which can contain scheme and 
context-path information as well
+        String serviceName;
+        if (name.contains("/")) {
+            serviceName = ObjectHelper.before(name, "/");
+            this.contextPath = ObjectHelper.after(name, "/");
+        } else if (name.contains("?")) {
+            serviceName = ObjectHelper.before(name, "?");
+            this.contextPath = ObjectHelper.after(name, "?");
+        } else {
+            serviceName = name;
+            this.contextPath = null;
+        }
+        if (serviceName.contains(":")) {
+            this.scheme = ObjectHelper.before(serviceName, ":");
+            this.name = ObjectHelper.after(serviceName, ":");
+        } else {
+            this.scheme = null;
+            this.name = serviceName;
+        }
+
         this.namespace = namespace;
         this.uri = uri;
         this.exchangePattern = exchangePattern;
         this.configuration = configuration;
+        this.serviceCallExpression = new ServiceCallExpression(this.name, 
this.scheme, this.contextPath, this.uri);
     }
 
     @Override
@@ -92,11 +121,22 @@ public class KubernetesServiceCallProcessor extends 
ServiceSupport implements As
         int port = server.getPort();
         LOG.debug("Random selected service {} active at server: {}:{}", name, 
ip, port);
 
-        // build uri based on the name
+        // set selected server as header
+        exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_SERVER_IP, 
ip);
+        exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_SERVER_PORT, 
port);
 
-        // TODO build uri
-        callback.done(true);
-        return true;
+        // use the dynamic send processor to call the service
+        return processor.process(exchange, callback);
+    }
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
     }
 
     @Override
@@ -121,12 +161,17 @@ public class KubernetesServiceCallProcessor extends 
ServiceSupport implements As
         ObjectHelper.notEmpty(configuration.getMasterUrl(), "masterUrl", this);
 
         discovery = new KubernetesServiceDiscovery(name, namespace, null, 
createKubernetesClient());
-        ServiceHelper.startService(discovery);
+        processor = new SendDynamicProcessor(uri, serviceCallExpression);
+        processor.setCamelContext(getCamelContext());
+        if (exchangePattern != null) {
+            processor.setPattern(exchangePattern);
+        }
+        ServiceHelper.startServices(discovery, processor);
     }
 
     @Override
     protected void doStop() throws Exception {
-        ServiceHelper.stopService(discovery);
+        ServiceHelper.stopServices(processor, discovery);
     }
 
     private OpenShiftClient createKubernetesClient() {

http://git-wip-us.apache.org/repos/asf/camel/blob/11589cbb/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/ServiceCallExpression.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/ServiceCallExpression.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/ServiceCallExpression.java
new file mode 100644
index 0000000..2cb1bf1
--- /dev/null
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/ServiceCallExpression.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.processor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.kubernetes.KubernetesConstants;
+import org.apache.camel.support.ExpressionAdapter;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ServiceCallExpression extends ExpressionAdapter {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ServiceCallExpression.class);
+
+    private final String name;
+    private final String scheme;
+    private final String contextPath;
+    private final String uri;
+
+    public ServiceCallExpression(String name, String scheme, String 
contextPath, String uri) {
+        this.name = name;
+        this.scheme = scheme;
+        this.contextPath = contextPath;
+        this.uri = uri;
+    }
+
+    @Override
+    public Object evaluate(Exchange exchange) {
+        try {
+            String ip = ExchangeHelper.getMandatoryHeader(exchange, 
KubernetesConstants.KUBERNETES_SERVER_IP, String.class);
+            int port = ExchangeHelper.getMandatoryHeader(exchange, 
KubernetesConstants.KUBERNETES_SERVER_PORT, int.class);
+            return buildCamelEndpointUri(ip, port, name, uri, contextPath, 
scheme);
+        } catch (Exception e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        }
+    }
+
+    protected static String buildCamelEndpointUri(String ip, int port, String 
name, String uri, String contextPath, String scheme) {
+        // serviceCall("myService") (will use http by default)
+        // serviceCall("myService/foo") (will use http by default)
+        // serviceCall("http:myService/foo")
+        // serviceCall("myService", "http:myService.host:myService.port/foo")
+        // serviceCall("myService", "netty4:tcp:myService?connectTimeout=1000")
+
+        // build basic uri if none provided
+        String answer = uri;
+        if (answer == null) {
+            if (scheme == null) {
+                if (port == 443) {
+                    scheme = "https";
+                } else {
+                    scheme = "http";
+                }
+            }
+            answer = scheme + "://" + ip + ":" + port;
+            if (contextPath != null) {
+                answer += "" + contextPath;
+            }
+        } else {
+            // we have existing uri, then replace the serviceName with ip:port
+            if (answer.contains(name + ".host")) {
+                answer = answer.replaceFirst(name + "\\.host", ip);
+            }
+            if (answer.contains(name + ".port")) {
+                answer = answer.replaceFirst(name + "\\.port", "" + port);
+            }
+            if (answer.contains(name)) {
+                answer = answer.replaceFirst(name, ip + ":" + port);
+            }
+        }
+
+        LOG.debug("Camel endpoint uri: {} for calling service: {} + on server 
{}:{}", answer, name, ip, port);
+        return answer;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/11589cbb/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/processor/ServiceCallRouteTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/processor/ServiceCallRouteTest.java
 
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/processor/ServiceCallRouteTest.java
index 6595b48..6164d0c 100644
--- 
a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/processor/ServiceCallRouteTest.java
+++ 
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/processor/ServiceCallRouteTest.java
@@ -42,11 +42,10 @@ public class ServiceCallRouteTest extends CamelTestSupport {
                 config.setMasterUrl("https://fabric8-master.vagrant.f8:8443";);
                 config.setUsername("admin");
                 config.setPassword("admin");
-//                
config.setOauthToken("eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJkZWZhdWx0Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9zZWNyZXQubmFtZSI6ImZhYnJpYzgtdG9rZW4tZzNsdGoiLCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlcnZpY2UtYWNjb3VudC5uYW1lIjoiZmFicmljOCIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VydmljZS1hY2NvdW50LnVpZCI6ImU0NGJhYzA0LWZmYjQtMTFlNS05MWM0LTA4MDAyN2I1YzJmNCIsInN1YiI6InN5c3RlbTpzZXJ2aWNlYWNjb3VudDpkZWZhdWx0OmZhYnJpYzgifQ.yqhevtuqliAV7RlRhaSjG8oFSOn2V1vfmj5V9JKpaOCWbWXMYS0y_v4QPfI4vIGsJtpZgasrt-8brkiOkq7zx0BJxVm-Ae5QIE1uJNeWFYcno823SUV2ebHykhp0eUEtCmWtHByBIoTTF8dG3NZ6jWow7KVGN289Y2ryi8QoYupfQ9ABddVVcduolStIqBu3pu-dJqIvlt6L8wE6AHfhS4uSaPwcimbs5hrg6gB_iONCSCSayhOyiT6fNlXdpxndRRBg9MP3X3f4dD3kDyHE0860HzqZ05jFIwGfV_rbFJeNY3SLDQNO_QFXqUZKg01OH-OJaqDSjuV48P9b6n4uHA");
                 config.setNamespace("default");
 
                 from("direct:start")
-                    .serviceCall("cdi-camel-jetty", "http:cdi-camel-jetty", 
config)
+                    .serviceCall("cdi-camel-jetty", null, config)
                     .to("mock:result");
             }
         };

Reply via email to