Repository: camel
Updated Branches:
  refs/heads/remoteServiceCall [created] 2801efd99


CAMEL-9683: A new toService EIP that uses a client discovery to lookup alive 
services and pick a service ip/port to use when calling the service from Camel 
route. Allows to plugin different providers.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e46a2369
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e46a2369
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e46a2369

Branch: refs/heads/remoteServiceCall
Commit: e46a2369692bcbb30785697110428e80d454be9c
Parents: c44c0b0
Author: Claus Ibsen <davscl...@apache.org>
Authored: Fri Apr 22 14:14:49 2016 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Mon May 23 09:22:11 2016 +0200

----------------------------------------------------------------------
 .../apache/camel/model/ProcessorDefinition.java |  16 +
 .../model/ToServiceConfigurationDefinition.java | 308 +++++++++++++++++++
 .../apache/camel/model/ToServiceDefinition.java | 201 ++++++++++++
 .../resources/org/apache/camel/model/jaxb.index |   2 +
 .../kubernetes/KubernetesEndpoint.java          |  21 +-
 .../processor/KubernetesProcessorFactory.java   |  80 +++++
 .../processor/KubernetesServiceDiscovery.java   |  90 ++++++
 .../processor/KubernetesServiceProcessor.java   | 173 +++++++++++
 .../component/kubernetes/processor/Server.java  |  36 +++
 .../org/apache/camel/model/ToServiceDefinition  |  18 ++
 10 files changed, 933 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e46a2369/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index 2270ef3..2acf785 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -654,6 +654,22 @@ public abstract class ProcessorDefinition<Type extends 
ProcessorDefinition<Type>
     }
 
     /**
+     * Sends the exchange to the given service
+     *
+     * @param name the service name
+     * @param uri  the endpoint uri to use for calling the service
+     * @return the builder
+     */
+    @SuppressWarnings("unchecked")
+    public Type toService(String name, String uri) {
+        ToServiceDefinition answer = new ToServiceDefinition();
+        answer.setName(name);
+        answer.setUri(uri);
+        addOutput(answer);
+        return (Type) this;
+    }
+
+    /**
      * Sends the exchange to the given endpoint
      *
      * @param endpoint  the endpoint to send to

http://git-wip-us.apache.org/repos/asf/camel/blob/e46a2369/camel-core/src/main/java/org/apache/camel/model/ToServiceConfigurationDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/ToServiceConfigurationDefinition.java
 
b/camel-core/src/main/java/org/apache/camel/model/ToServiceConfigurationDefinition.java
new file mode 100644
index 0000000..53c63f2
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/model/ToServiceConfigurationDefinition.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.model;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlTransient;
+
+import org.apache.camel.spi.Metadata;
+
+@Metadata(label = "eip,routing")
+@XmlRootElement(name = "toServiceConfiguration")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class ToServiceConfigurationDefinition extends IdentifiedType {
+
+    @XmlTransient
+    private ToServiceDefinition parent;
+    @XmlAttribute @Metadata(required = "true")
+    private String masterUrl;
+    @XmlAttribute
+    private String apiVersion;
+    @XmlAttribute @Metadata(label = "security")
+    private String username;
+    @XmlAttribute @Metadata(label = "security")
+    private String password;
+    @XmlAttribute @Metadata(label = "security")
+    private String oauthToken;
+    @XmlAttribute @Metadata(label = "security")
+    private String caCertData;
+    @XmlAttribute @Metadata(label = "security")
+    private String caCertFile;
+    @XmlAttribute @Metadata(label = "security")
+    private String clientCertData;
+    @XmlAttribute @Metadata(label = "security")
+    private String clientCertFile;
+    @XmlAttribute @Metadata(label = "security")
+    private String clientKeyAlgo;
+    @XmlAttribute @Metadata(label = "security")
+    private String clientKeyData;
+    @XmlAttribute @Metadata(label = "security")
+    private String clientKeyFile;
+    @XmlAttribute @Metadata(label = "security")
+    private String clientKeyPassphrase;
+    @XmlAttribute @Metadata(label = "security")
+    private Boolean trustCerts;
+
+    public ToServiceConfigurationDefinition() {
+    }
+
+    public ToServiceConfigurationDefinition(ToServiceDefinition parent) {
+        this.parent = parent;
+    }
+
+    // Getter/Setter
+    // 
-------------------------------------------------------------------------
+
+    public String getMasterUrl() {
+        return masterUrl;
+    }
+
+    public void setMasterUrl(String masterUrl) {
+        this.masterUrl = masterUrl;
+    }
+
+    public String getApiVersion() {
+        return apiVersion;
+    }
+
+    public void setApiVersion(String apiVersion) {
+        this.apiVersion = apiVersion;
+    }
+
+    public String getUsername() {
+        return username;
+    }
+
+    public void setUsername(String username) {
+        this.username = username;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public String getCaCertData() {
+        return caCertData;
+    }
+
+    public void setCaCertData(String caCertData) {
+        this.caCertData = caCertData;
+    }
+
+    public String getCaCertFile() {
+        return caCertFile;
+    }
+
+    public void setCaCertFile(String caCertFile) {
+        this.caCertFile = caCertFile;
+    }
+
+    public String getClientCertData() {
+        return clientCertData;
+    }
+
+    public void setClientCertData(String clientCertData) {
+        this.clientCertData = clientCertData;
+    }
+
+    public String getClientCertFile() {
+        return clientCertFile;
+    }
+
+    public void setClientCertFile(String clientCertFile) {
+        this.clientCertFile = clientCertFile;
+    }
+
+    public String getClientKeyAlgo() {
+        return clientKeyAlgo;
+    }
+
+    public void setClientKeyAlgo(String clientKeyAlgo) {
+        this.clientKeyAlgo = clientKeyAlgo;
+    }
+
+    public String getClientKeyData() {
+        return clientKeyData;
+    }
+
+    public void setClientKeyData(String clientKeyData) {
+        this.clientKeyData = clientKeyData;
+    }
+
+    public String getClientKeyFile() {
+        return clientKeyFile;
+    }
+
+    public void setClientKeyFile(String clientKeyFile) {
+        this.clientKeyFile = clientKeyFile;
+    }
+
+    public String getClientKeyPassphrase() {
+        return clientKeyPassphrase;
+    }
+
+    public void setClientKeyPassphrase(String clientKeyPassphrase) {
+        this.clientKeyPassphrase = clientKeyPassphrase;
+    }
+
+    public String getOauthToken() {
+        return oauthToken;
+    }
+
+    public void setOauthToken(String oauthToken) {
+        this.oauthToken = oauthToken;
+    }
+
+    public Boolean getTrustCerts() {
+        return trustCerts;
+    }
+
+    public void setTrustCerts(Boolean trustCerts) {
+        this.trustCerts = trustCerts;
+    }
+
+
+    // Fluent API
+    // 
-------------------------------------------------------------------------
+
+    /**
+     * Sets the URL to the master
+     */
+    public ToServiceConfigurationDefinition masterUrl(String masterUrl) {
+        setMasterUrl(masterUrl);
+        return this;
+    }
+
+    /**
+     * Sets the API version
+     */
+    public ToServiceConfigurationDefinition apiVersion(String apiVersion) {
+        setApiVersion(apiVersion);
+        return this;
+    }
+
+    /**
+     * Sets the username for authentication
+     */
+    public ToServiceConfigurationDefinition username(String username) {
+        setUsername(username);
+        return this;
+    }
+
+    /**
+     * Sets the password for authentication
+     */
+    public ToServiceConfigurationDefinition password(String password) {
+        setPassword(password);
+        return this;
+    }
+
+    /**
+     * Sets the OAUTH token for authentication (instead of username/password)
+     */
+    public ToServiceConfigurationDefinition oauthToken(String oauthToken) {
+        setOauthToken(oauthToken);
+        return this;
+    }
+
+    /**
+     * Sets the Certificate Authority data
+     */
+    public ToServiceConfigurationDefinition caCertData(String caCertData) {
+        setCaCertData(caCertData);
+        return this;
+    }
+
+    /**
+     * Sets the Certificate Authority data that are loaded from the file
+     */
+    public ToServiceConfigurationDefinition caCertFile(String caCertFile) {
+        setCaCertFile(caCertFile);
+        return this;
+    }
+
+    /**
+     * Sets the Client Certificate data
+     */
+    public ToServiceConfigurationDefinition clientCertData(String 
clientCertData) {
+        setClientCertData(clientCertData);
+        return this;
+    }
+
+    /**
+     * Sets the Client Certificate data that are loaded from the file
+     */
+    public ToServiceConfigurationDefinition clientCertFile(String 
clientCertFile) {
+        setClientCertFile(clientCertFile);
+        return this;
+    }
+
+    /**
+     * Sets the Client Keystore algorithm, such as RSA.
+     */
+    public ToServiceConfigurationDefinition clientKeyAlgo(String 
clientKeyAlgo) {
+        setClientKeyAlgo(clientKeyAlgo);
+        return this;
+    }
+
+    /**
+     * Sets the Client Keystore data
+     */
+    public ToServiceConfigurationDefinition clientKeyData(String 
clientKeyData) {
+        setClientKeyData(clientKeyData);
+        return this;
+    }
+
+    /**
+     * Sets the Client Keystore data that are loaded from the file
+     */
+    public ToServiceConfigurationDefinition clientKeyFile(String 
clientKeyFile) {
+        setClientKeyFile(clientKeyFile);
+        return this;
+    }
+
+    /**
+     * Sets the Client Keystore passphrase
+     */
+    public ToServiceConfigurationDefinition clientKeyPassphrase(String 
clientKeyPassphrase) {
+        setClientKeyPassphrase(clientKeyPassphrase);
+        return this;
+    }
+
+    /**
+     * Sets whether to turn on trust certificate check
+     */
+    public ToServiceConfigurationDefinition trustCerts(boolean trustCerts) {
+        setTrustCerts(trustCerts);
+        return this;
+    }
+
+    /**
+     * End of configuration
+     */
+    public ToServiceDefinition end() {
+        return parent;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e46a2369/camel-core/src/main/java/org/apache/camel/model/ToServiceDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/ToServiceDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/ToServiceDefinition.java
new file mode 100644
index 0000000..1c5dfc4
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/model/ToServiceDefinition.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.model;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.RouteContext;
+
+@Metadata(label = "eip,routing")
+@XmlRootElement(name = "toService")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class ToServiceDefinition extends 
NoOutputDefinition<ToServiceDefinition> {
+
+    // TODO: load balancing strategy
+
+    @XmlElement
+    private ToServiceConfigurationDefinition toServiceConfiguration;
+    @XmlAttribute @Metadata(required = "true")
+    private String uri;
+    @XmlAttribute
+    private ExchangePattern pattern;
+    @XmlAttribute @Metadata(defaultValue = "default")
+    private String namespace;
+    @XmlAttribute @Metadata(required = "true")
+    private String name;
+    @XmlAttribute
+    private String discovery;
+    @XmlAttribute
+    private String toServiceConfigurationRef;
+
+    public ToServiceDefinition() {
+    }
+
+    // toService("myService") (will use http by default)
+    // toService("myService/foo") (will use http by default)
+    // toService("http:myService/foo")
+    // toService("myService", "http:myService.host:myService.port/foo")
+    // toService("myService", "netty4:tcp:myService?connectTimeout=1000")
+
+    @Override
+    public String toString() {
+        return "ToService[" + name + "]";
+    }
+
+    @Override
+    public String getLabel() {
+        return "toService";
+    }
+
+    @Override
+    public Processor createProcessor(RouteContext routeContext) throws 
Exception {
+        if (discovery != null) {
+            throw new IllegalStateException("Cannot find Camel component on 
the classpath implementing the discovery provider: " + discovery);
+        } else {
+            throw new IllegalStateException("Cannot find Camel component 
supporting the ToService EIP. Add camel-kubernetes if you are using 
Kubernetes.");
+        }
+    }
+
+    // Fluent API
+    // 
-------------------------------------------------------------------------
+
+    /**
+     * Sets the optional {@link ExchangePattern} used to invoke this endpoint
+     */
+    public ToServiceDefinition pattern(ExchangePattern pattern) {
+        setPattern(pattern);
+        return this;
+    }
+
+    /**
+     * Sets the namespace of the service to use
+     */
+    public ToServiceDefinition namespace(String namespace) {
+        setNamespace(namespace);
+        return this;
+    }
+
+    /**
+     * Sets the name of the service to use
+     */
+    public ToServiceDefinition name(String name) {
+        setName(name);
+        return this;
+    }
+
+    /**
+     * Sets the discovery provided to use.
+     * <p/>
+     * Use kubernetes to use kubernetes.
+     * Use ribbon to use ribbon.
+     */
+    public ToServiceDefinition discovery(String discovery) {
+        setDiscovery(discovery);
+        return this;
+    }
+
+    public ToServiceConfigurationDefinition toServiceConfiguration() {
+        toServiceConfiguration = new ToServiceConfigurationDefinition(this);
+        return toServiceConfiguration;
+    }
+
+    /**
+     * Configures the Hystrix EIP using the given configuration
+     */
+    public ToServiceDefinition 
toServiceConfiguration(ToServiceConfigurationDefinition configuration) {
+        toServiceConfiguration = configuration;
+        return this;
+    }
+
+    /**
+     * Refers to a Hystrix configuration to use for configuring the Hystrix 
EIP.
+     */
+    public ToServiceDefinition toServiceConfiguration(String ref) {
+        toServiceConfigurationRef = ref;
+        return this;
+    }
+
+    // Properties
+    // 
-------------------------------------------------------------------------
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getNamespace() {
+        return namespace;
+    }
+
+    public void setNamespace(String namespace) {
+        this.namespace = namespace;
+    }
+
+    public ExchangePattern getPattern() {
+        return pattern;
+    }
+
+    public void setPattern(ExchangePattern pattern) {
+        this.pattern = pattern;
+    }
+
+    public String getDiscovery() {
+        return discovery;
+    }
+
+    public void setDiscovery(String discovery) {
+        this.discovery = discovery;
+    }
+
+    public ToServiceConfigurationDefinition getToServiceConfiguration() {
+        return toServiceConfiguration;
+    }
+
+    public void setToServiceConfiguration(ToServiceConfigurationDefinition 
toServiceConfiguration) {
+        this.toServiceConfiguration = toServiceConfiguration;
+    }
+
+    public String getToServiceConfigurationRef() {
+        return toServiceConfigurationRef;
+    }
+
+    public void setToServiceConfigurationRef(String toServiceConfigurationRef) 
{
+        this.toServiceConfigurationRef = toServiceConfigurationRef;
+    }
+
+    public String getUri() {
+        return uri;
+    }
+
+    /**
+     * The uri of the endpoint to send to.
+     * The uri can be dynamic computed using the {@link 
org.apache.camel.language.simple.SimpleLanguage} expression.
+     */
+    public void setUri(String uri) {
+        this.uri = uri;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e46a2369/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
----------------------------------------------------------------------
diff --git a/camel-core/src/main/resources/org/apache/camel/model/jaxb.index 
b/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
index 9859736..549557f 100644
--- a/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
+++ b/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
@@ -89,6 +89,8 @@ ThrottleDefinition
 ThrowExceptionDefinition
 ToDefinition
 ToDynamicDefinition
+ToServiceDefinition
+ToServiceConfigurationDefinition
 TransactedDefinition
 TransformDefinition
 TryDefinition

http://git-wip-us.apache.org/repos/asf/camel/blob/e46a2369/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java
index c0312a3..3204f5f 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java
@@ -21,7 +21,7 @@ import java.util.concurrent.ExecutorService;
 import io.fabric8.kubernetes.client.Config;
 import io.fabric8.kubernetes.client.ConfigBuilder;
 import io.fabric8.kubernetes.client.DefaultKubernetesClient;
-
+import io.fabric8.kubernetes.client.KubernetesClient;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
@@ -61,7 +61,7 @@ public class KubernetesEndpoint extends DefaultEndpoint {
     @UriParam
     private KubernetesConfiguration configuration;
 
-    private DefaultKubernetesClient client;
+    private transient KubernetesClient client;
 
     public KubernetesEndpoint(String uri, KubernetesComponent component, 
KubernetesConfiguration config) {
         super(uri, component);
@@ -160,22 +160,22 @@ public class KubernetesEndpoint extends DefaultEndpoint {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-
-        client = configuration.getKubernetesClient() != null ? 
configuration.getKubernetesClient()
-                : createKubernetesClient();
+        client = configuration.getKubernetesClient() != null ? 
configuration.getKubernetesClient() : createKubernetesClient();
     }
 
     @Override
     protected void doStop() throws Exception {
         super.doStop();
-        client.close();
+        if (client != null) {
+            client.close();
+        }
     }
     
     public ExecutorService createExecutor() {
         return 
getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, 
"KubernetesConsumer", configuration.getPoolSize());
     }
 
-    public DefaultKubernetesClient getKubernetesClient() {
+    public KubernetesClient getKubernetesClient() {
         return client;
     }
 
@@ -186,10 +186,9 @@ public class KubernetesEndpoint extends DefaultEndpoint {
         return configuration;
     }
 
-    private DefaultKubernetesClient createKubernetesClient() {
+    private KubernetesClient createKubernetesClient() {
         LOG.debug("Create Kubernetes client with the following Configuration: 
" + configuration.toString());
 
-        DefaultKubernetesClient kubeClient = new DefaultKubernetesClient();
         ConfigBuilder builder = new ConfigBuilder();
         builder.withMasterUrl(configuration.getMasterUrl());
         if ((ObjectHelper.isNotEmpty(configuration.getUsername())
@@ -232,8 +231,6 @@ public class KubernetesEndpoint extends DefaultEndpoint {
         }
 
         Config conf = builder.build();
-
-        kubeClient = new DefaultKubernetesClient(conf);
-        return kubeClient;
+        return new DefaultKubernetesClient(conf);
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/e46a2369/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java
new file mode 100644
index 0000000..f81c08d
--- /dev/null
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.processor;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.component.kubernetes.KubernetesConfiguration;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.model.ToServiceConfigurationDefinition;
+import org.apache.camel.model.ToServiceDefinition;
+import org.apache.camel.spi.ProcessorFactory;
+import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.CamelContextHelper;
+import org.apache.camel.util.IntrospectionSupport;
+
+public class KubernetesProcessorFactory implements ProcessorFactory {
+
+    @Override
+    public Processor createChildProcessor(RouteContext routeContext, 
ProcessorDefinition<?> definition, boolean mandatory) throws Exception {
+        // not in use
+        return null;
+    }
+
+    @Override
+    public Processor createProcessor(RouteContext routeContext, 
ProcessorDefinition<?> definition) throws Exception {
+        if (definition instanceof ToServiceDefinition) {
+            ToServiceDefinition ts = (ToServiceDefinition) definition;
+
+            // discovery must either not be set, or if set then must be us
+            if (ts.getDiscovery() != null && 
!"kubernetes".equals(ts.getDiscovery())) {
+                return null;
+            }
+
+            String name = ts.getName();
+            String namespace = ts.getNamespace();
+            String uri = ts.getUri();
+            ExchangePattern mep = ts.getPattern();
+
+            ToServiceConfigurationDefinition config = 
ts.getToServiceConfiguration();
+            ToServiceConfigurationDefinition configRef = null;
+            if (ts.getToServiceConfigurationRef() != null) {
+                configRef = 
CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), 
ts.getToServiceConfigurationRef(), ToServiceConfigurationDefinition.class);
+            }
+
+            // extract the properties from the configuration from the model
+            Map<String, Object> parameters = new HashMap<>();
+            if (configRef != null) {
+                IntrospectionSupport.getProperties(configRef, parameters, 
null);
+            }
+            if (config != null) {
+                IntrospectionSupport.getProperties(config, parameters, null);
+            }
+            // and set them on the kubernetes configuration class
+            KubernetesConfiguration kc = new KubernetesConfiguration();
+            IntrospectionSupport.setProperties(kc, parameters);
+
+            return new KubernetesServiceProcessor(name, namespace, uri, mep, 
kc);
+        } else {
+            return null;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e46a2369/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceDiscovery.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceDiscovery.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceDiscovery.java
new file mode 100644
index 0000000..420cdc3
--- /dev/null
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceDiscovery.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.processor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import io.fabric8.kubernetes.api.model.EndpointAddress;
+import io.fabric8.kubernetes.api.model.EndpointPort;
+import io.fabric8.kubernetes.api.model.EndpointSubset;
+import io.fabric8.kubernetes.api.model.Endpoints;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KubernetesServiceDiscovery extends ServiceSupport {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesServiceDiscovery.class);
+    private static final int FIRST = 0;
+
+    private String name;
+    private String namespace;
+    private String portName;
+    private KubernetesClient client;
+
+    public KubernetesServiceDiscovery(String name, String namespace, String 
portName, KubernetesClient client) {
+        this.name = name;
+        this.namespace = namespace;
+        this.portName = portName;
+        this.client = client;
+    }
+
+    public List<Server> getUpdatedListOfServers() {
+        Endpoints endpoints = 
client.endpoints().inNamespace(namespace).withName(name).get();
+        List<Server> result = new ArrayList<Server>();
+        if (endpoints != null) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Found [" + endpoints.getSubsets().size() + "] 
endpoints in namespace [" +
+                        namespace + "] for name [" + name + "] and portName [" 
+ portName + "]");
+            }
+            for (EndpointSubset subset : endpoints.getSubsets()) {
+                if (subset.getPorts().size() == 1) {
+                    EndpointPort port = subset.getPorts().get(FIRST);
+                    for (EndpointAddress address : subset.getAddresses()) {
+                        result.add(new Server(address.getIp(), 
port.getPort()));
+                    }
+                } else {
+                    for (EndpointPort port : subset.getPorts()) {
+                        if (ObjectHelper.isEmpty(portName) || 
portName.endsWith(port.getName())) {
+                            for (EndpointAddress address : 
subset.getAddresses()) {
+                                result.add(new Server(address.getIp(), 
port.getPort()));
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        return result;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        // noop
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (client != null) {
+            IOHelper.close(client);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e46a2369/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceProcessor.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceProcessor.java
new file mode 100644
index 0000000..3c9c231
--- /dev/null
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceProcessor.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.processor;
+
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.RejectedExecutionException;
+
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.ConfigBuilder;
+import io.fabric8.kubernetes.client.DefaultKubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Traceable;
+import org.apache.camel.component.kubernetes.KubernetesConfiguration;
+import org.apache.camel.spi.IdAware;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KubernetesServiceProcessor extends ServiceSupport implements 
AsyncProcessor, Traceable, IdAware {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesServiceProcessor.class);
+
+    private String id;
+    private final String name;
+    private final String namespace;
+    private final String uri;
+    private final ExchangePattern exchangePattern;
+    private final KubernetesConfiguration configuration;
+
+    private KubernetesServiceDiscovery discovery;
+
+    // TODO: allow to plugin custom load balancer like ribbon
+
+    public KubernetesServiceProcessor(String name, String namespace, String 
uri, ExchangePattern exchangePattern, KubernetesConfiguration configuration) {
+        this.name = name;
+        this.namespace = namespace;
+        this.uri = uri;
+        this.exchangePattern = exchangePattern;
+        this.configuration = configuration;
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        AsyncProcessorHelper.process(this, exchange);
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        // TODO: in try .. catch and the callback stuff
+
+        List<Server> services = discovery.getUpdatedListOfServers();
+        // apply strategy to pick a service
+        if (services.isEmpty()) {
+            exchange.setException(new RejectedExecutionException("No active 
services with name " + name + " in namespace " + namespace));
+        }
+
+        // what strategy to use? random
+        int size = services.size();
+        int ran = new Random().nextInt(size);
+        Server server = services.get(ran);
+
+        String ip = server.getIp();
+        int port = server.getPort();
+
+        LOG.debug("Random selected service {} active at: {}:{}", name, ip, 
port);
+
+        // build uri based on the name
+
+
+        // TODO: lookup service
+        // TODO: apply LB strategy
+        // TODO build uri
+        callback.done(true);
+        return true;
+    }
+
+    @Override
+    public String getId() {
+        return id;
+    }
+
+    @Override
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    @Override
+    public String getTraceLabel() {
+        return "kubernetes";
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        discovery = new KubernetesServiceDiscovery(name, namespace, null, 
createKubernetesClient());
+        ServiceHelper.startService(discovery);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        ServiceHelper.stopService(discovery);
+    }
+
+    private KubernetesClient createKubernetesClient() {
+        LOG.debug("Create Kubernetes client with the following Configuration: 
" + configuration.toString());
+
+        ConfigBuilder builder = new ConfigBuilder();
+        builder.withMasterUrl(configuration.getMasterUrl());
+        if ((ObjectHelper.isNotEmpty(configuration.getUsername())
+                && ObjectHelper.isNotEmpty(configuration.getPassword()))
+                && ObjectHelper.isEmpty(configuration.getOauthToken())) {
+            builder.withUsername(configuration.getUsername());
+            builder.withPassword(configuration.getPassword());
+        } else {
+            builder.withOauthToken(configuration.getOauthToken());
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getCaCertData())) {
+            builder.withCaCertData(configuration.getCaCertData());
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getCaCertFile())) {
+            builder.withCaCertFile(configuration.getCaCertFile());
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getClientCertData())) {
+            builder.withClientCertData(configuration.getClientCertData());
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getClientCertFile())) {
+            builder.withClientCertFile(configuration.getClientCertFile());
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getApiVersion())) {
+            builder.withApiVersion(configuration.getApiVersion());
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getClientKeyAlgo())) {
+            builder.withClientKeyAlgo(configuration.getClientKeyAlgo());
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getClientKeyData())) {
+            builder.withClientKeyData(configuration.getClientKeyData());
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getClientKeyFile())) {
+            builder.withClientKeyFile(configuration.getClientKeyFile());
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getClientKeyPassphrase())) {
+            
builder.withClientKeyPassphrase(configuration.getClientKeyPassphrase());
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getTrustCerts())) {
+            builder.withTrustCerts(configuration.getTrustCerts());
+        }
+
+        Config conf = builder.build();
+        return new DefaultKubernetesClient(conf);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e46a2369/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/Server.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/Server.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/Server.java
new file mode 100644
index 0000000..021fc86
--- /dev/null
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/Server.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.processor;
+
+public class Server {
+
+    private final String ip;
+    private final int port;
+
+    public Server(String ip, int port) {
+        this.ip = ip;
+        this.port = port;
+    }
+
+    public String getIp() {
+        return ip;
+    }
+
+    public int getPort() {
+        return port;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e46a2369/components/camel-kubernetes/src/main/resources/META-INF/services/org/apache/camel/model/ToServiceDefinition
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/resources/META-INF/services/org/apache/camel/model/ToServiceDefinition
 
b/components/camel-kubernetes/src/main/resources/META-INF/services/org/apache/camel/model/ToServiceDefinition
new file mode 100644
index 0000000..acf5be8
--- /dev/null
+++ 
b/components/camel-kubernetes/src/main/resources/META-INF/services/org/apache/camel/model/ToServiceDefinition
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class=org.apache.camel.component.kubernetes.processor.KubernetesProcessorFactory

Reply via email to