Fixed CS
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6cefe5ee Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6cefe5ee Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6cefe5ee Branch: refs/heads/master Commit: 6cefe5eebfe4907116a84067e2c4b522e9d28ed4 Parents: f712e16 Author: Andrea Cosentino <anco...@gmail.com> Authored: Thu Oct 29 17:12:49 2015 +0100 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Thu Oct 29 17:12:49 2015 +0100 ---------------------------------------------------------------------- .../kubernetes/KubernetesConfiguration.java | 51 ++--- .../kubernetes/KubernetesConstants.java | 4 +- .../kubernetes/KubernetesEndpoint.java | 64 +++--- .../consumer/KubernetesPodsConsumer.java | 178 ++++++++-------- ...ubernetesReplicationControllersConsumer.java | 177 ++++++++-------- .../consumer/KubernetesSecretsConsumer.java | 169 ++++++++------- .../consumer/KubernetesServicesConsumer.java | 175 ++++++++-------- .../kubernetes/consumer/common/PodEvent.java | 52 +++-- .../common/ReplicationControllerEvent.java | 52 +++-- .../kubernetes/consumer/common/SecretEvent.java | 52 +++-- .../consumer/common/ServiceEvent.java | 52 +++-- .../KubernetesBuildConfigsProducer.java | 51 ++--- .../producer/KubernetesBuildsProducer.java | 55 ++--- .../producer/KubernetesNodesProducer.java | 49 ++--- .../kubernetes/KubernetesTestSupport.java | 2 +- .../consumer/KubernetesPodsConsumerTest.java | 79 +++---- ...netesReplicationControllersConsumerTest.java | 210 ++++++++----------- .../consumer/KubernetesSecretsConsumerTest.java | 127 +++++------ .../KubernetesServicesConsumerTest.java | 189 ++++++++--------- 19 files changed, 842 insertions(+), 946 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/6cefe5ee/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConfiguration.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConfiguration.java index 9c5696f..0d3cda4 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConfiguration.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConfiguration.java @@ -45,9 +45,9 @@ public class KubernetesConfiguration { @UriParam(label = "producer", enums = "listNamespaces,listNamespacesByLabels,getNamespace,createNamespace,deleteNamespace,listServices,listServicesByLabels,getService,createService," + "deleteService,listReplicationControllers,listReplicationControllersByLabels,getReplicationController,createReplicationController,deleteReplicationController,listPods," - + "listPodsByLabels,getPod,createPod,deletePod,listPersistentVolumes,listPersistentVolumesByLabels,getPersistentVolume,listPersistentVolumesClaims,listPersistentVolumesClaimsByLabels," - + "getPersistentVolumeClaim,createPersistentVolumeClaim,deletePersistentVolumeClaim,listSecrets,listSecretsByLabels,getSecret,createSecret,deleteSecret,listResourcesQuota," - + "listResourcesQuotaByLabels,getResourceQuota,createResourceQuota,deleteResourceQuota,listServiceAccounts,listServiceAccountsByLabels,getServiceAccount,createServiceAccount," + + "listPodsByLabels,getPod,createPod,deletePod,listPersistentVolumes,listPersistentVolumesByLabels,getPersistentVolume,listPersistentVolumesClaims,listPersistentVolumesClaimsByLabels," + + "getPersistentVolumeClaim,createPersistentVolumeClaim,deletePersistentVolumeClaim,listSecrets,listSecretsByLabels,getSecret,createSecret,deleteSecret,listResourcesQuota," + + "listResourcesQuotaByLabels,getResourceQuota,createResourceQuota,deleteResourceQuota,listServiceAccounts,listServiceAccountsByLabels,getServiceAccount,createServiceAccount," + "deleteServiceAccount,listNodes,listNodesByLabels,getNode,listBuilds,listBuildsByLabels,getBuild,listBuildConfigs,listBuildConfigsByLabels,getBuildConfig") private String operation; @@ -83,7 +83,7 @@ public class KubernetesConfiguration { @UriParam private Boolean trustCerts; - + @UriParam(label = "consumer") private String namespaceName; @@ -253,7 +253,7 @@ public class KubernetesConfiguration { } /** - * The Auth Token + * The Auth Token */ public String getOauthToken() { return oauthToken; @@ -277,27 +277,22 @@ public class KubernetesConfiguration { /** * The namespace name */ - public String getNamespaceName() { - return namespaceName; - } - - public void setNamespaceName(String namespaceName) { - this.namespaceName = namespaceName; - } - - @Override - public String toString() { - return "KubernetesConfiguration [masterUrl=" + masterUrl - + ", category=" + category + ", kubernetesClient=" - + kubernetesClient + ", username=" + username + ", password=" - + password + ", operation=" + operation + ", apiVersion=" - + apiVersion + ", caCertData=" + caCertData + ", caCertFile=" - + caCertFile + ", clientCertData=" + clientCertData - + ", clientCertFile=" + clientCertFile + ", clientKeyAlgo=" - + clientKeyAlgo + ", clientKeyData=" + clientKeyData - + ", clientKeyFile=" + clientKeyFile + ", clientKeyPassphrase=" - + clientKeyPassphrase + ", oauthToken=" + oauthToken - + ", trustCerts=" + trustCerts + ", namespaceName=" - + namespaceName + "]"; - } + public String getNamespaceName() { + return namespaceName; + } + + public void setNamespaceName(String namespaceName) { + this.namespaceName = namespaceName; + } + + @Override + public String toString() { + return "KubernetesConfiguration [masterUrl=" + masterUrl + ", category=" + category + ", kubernetesClient=" + + kubernetesClient + ", username=" + username + ", password=" + password + ", operation=" + operation + + ", apiVersion=" + apiVersion + ", caCertData=" + caCertData + ", caCertFile=" + caCertFile + + ", clientCertData=" + clientCertData + ", clientCertFile=" + clientCertFile + ", clientKeyAlgo=" + + clientKeyAlgo + ", clientKeyData=" + clientKeyData + ", clientKeyFile=" + clientKeyFile + + ", clientKeyPassphrase=" + clientKeyPassphrase + ", oauthToken=" + oauthToken + ", trustCerts=" + + trustCerts + ", namespaceName=" + namespaceName + "]"; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/6cefe5ee/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConstants.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConstants.java index 6fcad8e..6613f19 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConstants.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConstants.java @@ -17,7 +17,7 @@ package org.apache.camel.component.kubernetes; public interface KubernetesConstants { - // Producer + // Producer String KUBERNETES_OPERATION = "CamelKubernetesOperation"; String KUBERNETES_NAMESPACE_NAME = "CamelKubernetesNamespaceName"; String KUBERNETES_NAMESPACE_LABELS = "CamelKubernetesNamespaceLabels"; @@ -50,7 +50,7 @@ public interface KubernetesConstants { String KUBERNETES_BUILD_NAME = "CamelKubernetesBuildName"; String KUBERNETES_BUILD_CONFIGS_LABELS = "CamelKubernetesBuildConfigsLabels"; String KUBERNETES_BUILD_CONFIG_NAME = "CamelKubernetesBuildConfigName"; - + // Consumer String KUBERNETES_EVENT_ACTION = "CamelKubernetesEventAction"; String KUBERNETES_EVENT_TIMESTAMP = "CamelKubernetesEventTimestamp"; http://git-wip-us.apache.org/repos/asf/camel/blob/6cefe5ee/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java index 3568f9f..8e1f638 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java @@ -27,14 +27,13 @@ import org.apache.camel.component.kubernetes.consumer.KubernetesPodsConsumer; import org.apache.camel.component.kubernetes.consumer.KubernetesReplicationControllersConsumer; import org.apache.camel.component.kubernetes.consumer.KubernetesSecretsConsumer; import org.apache.camel.component.kubernetes.consumer.KubernetesServicesConsumer; +import org.apache.camel.component.kubernetes.producer.KubernetesBuildConfigsProducer; +import org.apache.camel.component.kubernetes.producer.KubernetesBuildsProducer; import org.apache.camel.component.kubernetes.producer.KubernetesNamespacesProducer; +import org.apache.camel.component.kubernetes.producer.KubernetesNodesProducer; import org.apache.camel.component.kubernetes.producer.KubernetesPersistentVolumesClaimsProducer; import org.apache.camel.component.kubernetes.producer.KubernetesPersistentVolumesProducer; import org.apache.camel.component.kubernetes.producer.KubernetesPodsProducer; -import org.apache.camel.component.kubernetes.KubernetesCategory; -import org.apache.camel.component.kubernetes.producer.KubernetesBuildConfigsProducer; -import org.apache.camel.component.kubernetes.producer.KubernetesBuildsProducer; -import org.apache.camel.component.kubernetes.producer.KubernetesNodesProducer; import org.apache.camel.component.kubernetes.producer.KubernetesReplicationControllersProducer; import org.apache.camel.component.kubernetes.producer.KubernetesResourcesQuotaProducer; import org.apache.camel.component.kubernetes.producer.KubernetesSecretsProducer; @@ -50,16 +49,14 @@ import org.slf4j.LoggerFactory; @UriEndpoint(scheme = "kubernetes", title = "Kubernetes", syntax = "kubernetes:master", label = "cloud,paas") public class KubernetesEndpoint extends DefaultEndpoint { - private static final Logger LOG = LoggerFactory - .getLogger(KubernetesEndpoint.class); - + private static final Logger LOG = LoggerFactory.getLogger(KubernetesEndpoint.class); + @UriParam private KubernetesConfiguration configuration; private DefaultKubernetesClient client; - public KubernetesEndpoint(String uri, KubernetesComponent component, - KubernetesConfiguration config) { + public KubernetesEndpoint(String uri, KubernetesComponent component, KubernetesConfiguration config) { super(uri, component); this.configuration = config; } @@ -67,8 +64,7 @@ public class KubernetesEndpoint extends DefaultEndpoint { @Override public Producer createProducer() throws Exception { if (ObjectHelper.isEmpty(configuration.getCategory())) { - throw new IllegalArgumentException( - "A producer category must be specified"); + throw new IllegalArgumentException("A producer category must be specified"); } else { String category = configuration.getCategory(); @@ -94,25 +90,24 @@ public class KubernetesEndpoint extends DefaultEndpoint { case KubernetesCategory.SECRETS: return new KubernetesSecretsProducer(this); - + case KubernetesCategory.RESOURCES_QUOTA: return new KubernetesResourcesQuotaProducer(this); - + case KubernetesCategory.SERVICE_ACCOUNTS: return new KubernetesServiceAccountsProducer(this); - + case KubernetesCategory.NODES: return new KubernetesNodesProducer(this); - + case KubernetesCategory.BUILDS: return new KubernetesBuildsProducer(this); - + case KubernetesCategory.BUILD_CONFIGS: return new KubernetesBuildConfigsProducer(this); - + default: - throw new IllegalArgumentException("The " + category - + " producer category doesn't exist"); + throw new IllegalArgumentException("The " + category + " producer category doesn't exist"); } } } @@ -120,8 +115,7 @@ public class KubernetesEndpoint extends DefaultEndpoint { @Override public Consumer createConsumer(Processor processor) throws Exception { if (ObjectHelper.isEmpty(configuration.getCategory())) { - throw new IllegalArgumentException( - "A consumer category must be specified"); + throw new IllegalArgumentException("A consumer category must be specified"); } else { String category = configuration.getCategory(); @@ -129,19 +123,18 @@ public class KubernetesEndpoint extends DefaultEndpoint { case KubernetesCategory.PODS: return new KubernetesPodsConsumer(this, processor); - + case KubernetesCategory.SERVICES: return new KubernetesServicesConsumer(this, processor); - + case KubernetesCategory.REPLICATION_CONTROLLERS: return new KubernetesReplicationControllersConsumer(this, processor); - + case KubernetesCategory.SECRETS: return new KubernetesSecretsConsumer(this, processor); - + default: - throw new IllegalArgumentException("The " + category - + " consumer category doesn't exist"); + throw new IllegalArgumentException("The " + category + " consumer category doesn't exist"); } } } @@ -154,9 +147,9 @@ public class KubernetesEndpoint extends DefaultEndpoint { @Override protected void doStart() throws Exception { super.doStart(); - - client = configuration.getKubernetesClient() != null ? configuration - .getKubernetesClient() : createKubernetesClient(); + + client = configuration.getKubernetesClient() != null ? configuration.getKubernetesClient() + : createKubernetesClient(); } @Override @@ -169,7 +162,6 @@ public class KubernetesEndpoint extends DefaultEndpoint { return client; } - /** * The kubernetes Configuration */ @@ -179,12 +171,12 @@ public class KubernetesEndpoint extends DefaultEndpoint { private DefaultKubernetesClient createKubernetesClient() { LOG.debug("Create Kubernetes client with the following Configuration: " + configuration.toString()); - + DefaultKubernetesClient kubeClient = new DefaultKubernetesClient(); ConfigBuilder builder = new ConfigBuilder(); builder.withMasterUrl(configuration.getMasterUrl()); - if ((ObjectHelper.isNotEmpty(configuration.getUsername()) && ObjectHelper - .isNotEmpty(configuration.getPassword())) + if ((ObjectHelper.isNotEmpty(configuration.getUsername()) + && ObjectHelper.isNotEmpty(configuration.getPassword())) && ObjectHelper.isEmpty(configuration.getOauthToken())) { builder.withUsername(configuration.getUsername()); builder.withPassword(configuration.getPassword()); @@ -221,9 +213,9 @@ public class KubernetesEndpoint extends DefaultEndpoint { if (ObjectHelper.isNotEmpty(configuration.getTrustCerts())) { builder.withTrustCerts(configuration.getTrustCerts()); } - + Config conf = builder.build(); - + kubeClient = new DefaultKubernetesClient(conf); return kubeClient; } http://git-wip-us.apache.org/repos/asf/camel/blob/6cefe5ee/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesPodsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesPodsConsumer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesPodsConsumer.java index 9524f71..09ee7e8 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesPodsConsumer.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesPodsConsumer.java @@ -16,13 +16,13 @@ */ package org.apache.camel.component.kubernetes.consumer; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.Watcher; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.component.kubernetes.KubernetesConstants; @@ -35,98 +35,84 @@ import org.slf4j.LoggerFactory; public class KubernetesPodsConsumer extends ScheduledPollConsumer { - private static final Logger LOG = LoggerFactory - .getLogger(KubernetesPodsConsumer.class); - - private ConcurrentMap<Long, PodEvent> map; - - public KubernetesPodsConsumer(KubernetesEndpoint endpoint, - Processor processor) { - super(endpoint, processor); - } - - @Override - public KubernetesEndpoint getEndpoint() { - return (KubernetesEndpoint) super.getEndpoint(); - } - - @Override - protected void doStart() throws Exception { - super.doStart(); - map = new ConcurrentHashMap<Long, PodEvent>(); - - if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration() - .getOauthToken())) { - if (ObjectHelper.isNotEmpty(getEndpoint() - .getKubernetesConfiguration().getNamespaceName())) { - getEndpoint() - .getKubernetesClient() - .pods() - .inNamespace( - getEndpoint().getKubernetesConfiguration() - .getNamespaceName()) - .watch(new Watcher<Pod>() { - - @Override - public void eventReceived( - io.fabric8.kubernetes.client.Watcher.Action action, - Pod resource) { - PodEvent pe = new PodEvent(action, resource); - map.put(System.currentTimeMillis(), pe); - } - - @Override - public void onClose(KubernetesClientException cause) { - if (cause != null) { - LOG.error(cause.getMessage(), cause); - } - - } - }); - } else { - getEndpoint().getKubernetesClient().pods() - .watch(new Watcher<Pod>() { - - @Override - public void eventReceived( - io.fabric8.kubernetes.client.Watcher.Action action, - Pod resource) { - PodEvent pe = new PodEvent(action, resource); - map.put(System.currentTimeMillis(), pe); - } - - @Override - public void onClose(KubernetesClientException cause) { - if (cause != null) { - LOG.error(cause.getMessage(), cause); - } - } - }); - } - } - } - - @Override - protected void doStop() throws Exception { - super.doStop(); - map.clear(); - } - - @Override - protected int poll() throws Exception { - int mapSize = map.size(); - for (ConcurrentMap.Entry<Long, PodEvent> entry : map.entrySet()) { - PodEvent podEvent = (PodEvent) entry.getValue(); - Exchange e = getEndpoint().createExchange(); - e.getIn().setBody(podEvent.getPod()); - e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, - podEvent.getAction()); - e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, - entry.getKey()); - getProcessor().process(e); - map.remove(entry.getKey()); - } - return mapSize; - } + private static final Logger LOG = LoggerFactory.getLogger(KubernetesPodsConsumer.class); + + private ConcurrentMap<Long, PodEvent> map; + + public KubernetesPodsConsumer(KubernetesEndpoint endpoint, Processor processor) { + super(endpoint, processor); + } + + @Override + public KubernetesEndpoint getEndpoint() { + return (KubernetesEndpoint) super.getEndpoint(); + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + map = new ConcurrentHashMap<Long, PodEvent>(); + + if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken())) { + if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespaceName())) { + getEndpoint().getKubernetesClient().pods() + .inNamespace(getEndpoint().getKubernetesConfiguration().getNamespaceName()) + .watch(new Watcher<Pod>() { + + @Override + public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, + Pod resource) { + PodEvent pe = new PodEvent(action, resource); + map.put(System.currentTimeMillis(), pe); + } + + @Override + public void onClose(KubernetesClientException cause) { + if (cause != null) { + LOG.error(cause.getMessage(), cause); + } + + } + }); + } else { + getEndpoint().getKubernetesClient().pods().watch(new Watcher<Pod>() { + + @Override + public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, Pod resource) { + PodEvent pe = new PodEvent(action, resource); + map.put(System.currentTimeMillis(), pe); + } + + @Override + public void onClose(KubernetesClientException cause) { + if (cause != null) { + LOG.error(cause.getMessage(), cause); + } + } + }); + } + } + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + map.clear(); + } + + @Override + protected int poll() throws Exception { + int mapSize = map.size(); + for (ConcurrentMap.Entry<Long, PodEvent> entry : map.entrySet()) { + PodEvent podEvent = (PodEvent) entry.getValue(); + Exchange e = getEndpoint().createExchange(); + e.getIn().setBody(podEvent.getPod()); + e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, podEvent.getAction()); + e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, entry.getKey()); + getProcessor().process(e); + map.remove(entry.getKey()); + } + return mapSize; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/6cefe5ee/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumer.java index 316fe67..4c8d1e8 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumer.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumer.java @@ -16,13 +16,13 @@ */ package org.apache.camel.component.kubernetes.consumer; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + import io.fabric8.kubernetes.api.model.ReplicationController; import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.Watcher; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.component.kubernetes.KubernetesConstants; @@ -35,93 +35,88 @@ import org.slf4j.LoggerFactory; public class KubernetesReplicationControllersConsumer extends ScheduledPollConsumer { - private static final Logger LOG = LoggerFactory - .getLogger(KubernetesReplicationControllersConsumer.class); - - private ConcurrentMap<Long, ReplicationControllerEvent> map; - - public KubernetesReplicationControllersConsumer(KubernetesEndpoint endpoint, - Processor processor) { - super(endpoint, processor); - } - - @Override - public KubernetesEndpoint getEndpoint() { - return (KubernetesEndpoint) super.getEndpoint(); - } - - @Override - protected void doStart() throws Exception { - super.doStart(); - map = new ConcurrentHashMap<Long, ReplicationControllerEvent>(); - - if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken())) { - if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespaceName())) { - getEndpoint().getKubernetesClient().replicationControllers().inNamespace(getEndpoint().getKubernetesConfiguration().getNamespaceName()) - .watch(new Watcher<ReplicationController>() { - - @Override - public void eventReceived( - io.fabric8.kubernetes.client.Watcher.Action action, - ReplicationController resource) { - ReplicationControllerEvent rce = new ReplicationControllerEvent(action, resource); - map.put(System.currentTimeMillis(), rce); - - } - - @Override - public void onClose(KubernetesClientException cause) { - if (cause != null) { - LOG.error(cause.getMessage(), cause); - } - } - - - }); - } else { - getEndpoint().getKubernetesClient().replicationControllers() - .watch(new Watcher<ReplicationController>() { - - - @Override - public void eventReceived( - io.fabric8.kubernetes.client.Watcher.Action action, - ReplicationController resource) { - ReplicationControllerEvent se = new ReplicationControllerEvent(action, resource); - map.put(System.currentTimeMillis(), se); - - } - - @Override - public void onClose(KubernetesClientException cause) { - if (cause != null) { - LOG.error(cause.getMessage(), cause); - } - } - }); - } - } - } - - @Override - protected void doStop() throws Exception { - super.doStop(); - map.clear(); - } - - @Override - protected int poll() throws Exception { - int mapSize = map.size(); - for (ConcurrentMap.Entry<Long, ReplicationControllerEvent> entry : map.entrySet()) { - ReplicationControllerEvent serviceEvent = (ReplicationControllerEvent) entry.getValue(); - Exchange e = getEndpoint().createExchange(); - e.getIn().setBody(serviceEvent.getReplicationController()); - e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, serviceEvent.getAction()); - e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, entry.getKey()); - getProcessor().process(e); - map.remove(entry.getKey()); - } - return mapSize; - } + private static final Logger LOG = LoggerFactory.getLogger(KubernetesReplicationControllersConsumer.class); + + private ConcurrentMap<Long, ReplicationControllerEvent> map; + + public KubernetesReplicationControllersConsumer(KubernetesEndpoint endpoint, Processor processor) { + super(endpoint, processor); + } + + @Override + public KubernetesEndpoint getEndpoint() { + return (KubernetesEndpoint) super.getEndpoint(); + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + map = new ConcurrentHashMap<Long, ReplicationControllerEvent>(); + + if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken())) { + if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespaceName())) { + getEndpoint().getKubernetesClient().replicationControllers() + .inNamespace(getEndpoint().getKubernetesConfiguration().getNamespaceName()) + .watch(new Watcher<ReplicationController>() { + + @Override + public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, + ReplicationController resource) { + ReplicationControllerEvent rce = new ReplicationControllerEvent(action, resource); + map.put(System.currentTimeMillis(), rce); + + } + + @Override + public void onClose(KubernetesClientException cause) { + if (cause != null) { + LOG.error(cause.getMessage(), cause); + } + } + + }); + } else { + getEndpoint().getKubernetesClient().replicationControllers() + .watch(new Watcher<ReplicationController>() { + + @Override + public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, + ReplicationController resource) { + ReplicationControllerEvent se = new ReplicationControllerEvent(action, resource); + map.put(System.currentTimeMillis(), se); + + } + + @Override + public void onClose(KubernetesClientException cause) { + if (cause != null) { + LOG.error(cause.getMessage(), cause); + } + } + }); + } + } + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + map.clear(); + } + + @Override + protected int poll() throws Exception { + int mapSize = map.size(); + for (ConcurrentMap.Entry<Long, ReplicationControllerEvent> entry : map.entrySet()) { + ReplicationControllerEvent serviceEvent = (ReplicationControllerEvent) entry.getValue(); + Exchange e = getEndpoint().createExchange(); + e.getIn().setBody(serviceEvent.getReplicationController()); + e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, serviceEvent.getAction()); + e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, entry.getKey()); + getProcessor().process(e); + map.remove(entry.getKey()); + } + return mapSize; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/6cefe5ee/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesSecretsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesSecretsConsumer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesSecretsConsumer.java index 783796d..09ab19a 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesSecretsConsumer.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesSecretsConsumer.java @@ -16,13 +16,13 @@ */ package org.apache.camel.component.kubernetes.consumer; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + import io.fabric8.kubernetes.api.model.Secret; import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.Watcher; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.component.kubernetes.KubernetesConstants; @@ -35,89 +35,84 @@ import org.slf4j.LoggerFactory; public class KubernetesSecretsConsumer extends ScheduledPollConsumer { - private static final Logger LOG = LoggerFactory - .getLogger(KubernetesSecretsConsumer.class); - - private ConcurrentMap<Long, SecretEvent> map; - - public KubernetesSecretsConsumer(KubernetesEndpoint endpoint, - Processor processor) { - super(endpoint, processor); - } - - @Override - public KubernetesEndpoint getEndpoint() { - return (KubernetesEndpoint) super.getEndpoint(); - } - - @Override - protected void doStart() throws Exception { - super.doStart(); - map = new ConcurrentHashMap<Long, SecretEvent>(); - - if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken())) { - if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespaceName())) { - getEndpoint().getKubernetesClient().secrets().inNamespace(getEndpoint().getKubernetesConfiguration().getNamespaceName()) - .watch(new Watcher<Secret>() { - - @Override - public void eventReceived( - io.fabric8.kubernetes.client.Watcher.Action action, - Secret resource) { - SecretEvent se = new SecretEvent(action, resource); - map.put(System.currentTimeMillis(), se); - } - - @Override - public void onClose(KubernetesClientException cause) { - if (cause != null) { - LOG.error(cause.getMessage(), cause); - } - - } - }); - } else { - getEndpoint().getKubernetesClient().secrets() - .watch(new Watcher<Secret>() { - - @Override - public void eventReceived( - io.fabric8.kubernetes.client.Watcher.Action action, - Secret resource) { - SecretEvent se = new SecretEvent(action, resource); - map.put(System.currentTimeMillis(), se); - } - - @Override - public void onClose(KubernetesClientException cause) { - if (cause != null) { - LOG.error(cause.getMessage(), cause); - } - } - }); - } - } - } - - @Override - protected void doStop() throws Exception { - super.doStop(); - map.clear(); - } - - @Override - protected int poll() throws Exception { - int mapSize = map.size(); - for (ConcurrentMap.Entry<Long, SecretEvent> entry : map.entrySet()) { - SecretEvent podEvent = (SecretEvent) entry.getValue(); - Exchange e = getEndpoint().createExchange(); - e.getIn().setBody(podEvent.getSecret()); - e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, podEvent.getAction()); - e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, entry.getKey()); - getProcessor().process(e); - map.remove(entry.getKey()); - } - return mapSize; - } + private static final Logger LOG = LoggerFactory.getLogger(KubernetesSecretsConsumer.class); + + private ConcurrentMap<Long, SecretEvent> map; + + public KubernetesSecretsConsumer(KubernetesEndpoint endpoint, Processor processor) { + super(endpoint, processor); + } + + @Override + public KubernetesEndpoint getEndpoint() { + return (KubernetesEndpoint) super.getEndpoint(); + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + map = new ConcurrentHashMap<Long, SecretEvent>(); + + if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken())) { + if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespaceName())) { + getEndpoint().getKubernetesClient().secrets() + .inNamespace(getEndpoint().getKubernetesConfiguration().getNamespaceName()) + .watch(new Watcher<Secret>() { + + @Override + public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, + Secret resource) { + SecretEvent se = new SecretEvent(action, resource); + map.put(System.currentTimeMillis(), se); + } + + @Override + public void onClose(KubernetesClientException cause) { + if (cause != null) { + LOG.error(cause.getMessage(), cause); + } + + } + }); + } else { + getEndpoint().getKubernetesClient().secrets().watch(new Watcher<Secret>() { + + @Override + public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, Secret resource) { + SecretEvent se = new SecretEvent(action, resource); + map.put(System.currentTimeMillis(), se); + } + + @Override + public void onClose(KubernetesClientException cause) { + if (cause != null) { + LOG.error(cause.getMessage(), cause); + } + } + }); + } + } + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + map.clear(); + } + + @Override + protected int poll() throws Exception { + int mapSize = map.size(); + for (ConcurrentMap.Entry<Long, SecretEvent> entry : map.entrySet()) { + SecretEvent podEvent = (SecretEvent) entry.getValue(); + Exchange e = getEndpoint().createExchange(); + e.getIn().setBody(podEvent.getSecret()); + e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, podEvent.getAction()); + e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, entry.getKey()); + getProcessor().process(e); + map.remove(entry.getKey()); + } + return mapSize; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/6cefe5ee/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumer.java index de25388..fc68648 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumer.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumer.java @@ -16,13 +16,13 @@ */ package org.apache.camel.component.kubernetes.consumer; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + import io.fabric8.kubernetes.api.model.Service; import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.Watcher; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.component.kubernetes.KubernetesConstants; @@ -35,93 +35,86 @@ import org.slf4j.LoggerFactory; public class KubernetesServicesConsumer extends ScheduledPollConsumer { - private static final Logger LOG = LoggerFactory - .getLogger(KubernetesServicesConsumer.class); - - private ConcurrentMap<Long, ServiceEvent> map; - - public KubernetesServicesConsumer(KubernetesEndpoint endpoint, - Processor processor) { - super(endpoint, processor); - } - - @Override - public KubernetesEndpoint getEndpoint() { - return (KubernetesEndpoint) super.getEndpoint(); - } - - @Override - protected void doStart() throws Exception { - super.doStart(); - map = new ConcurrentHashMap<Long, ServiceEvent>(); - - if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken())) { - if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespaceName())) { - getEndpoint().getKubernetesClient().services().inNamespace(getEndpoint().getKubernetesConfiguration().getNamespaceName()) - .watch(new Watcher<Service>() { - - @Override - public void eventReceived( - io.fabric8.kubernetes.client.Watcher.Action action, - Service resource) { - ServiceEvent se = new ServiceEvent(action, resource); - map.put(System.currentTimeMillis(), se); - - } - - @Override - public void onClose(KubernetesClientException cause) { - if (cause != null) { - LOG.error(cause.getMessage(), cause); - } - } - - - }); - } else { - getEndpoint().getKubernetesClient().services() - .watch(new Watcher<Service>() { - - - @Override - public void eventReceived( - io.fabric8.kubernetes.client.Watcher.Action action, - Service resource) { - ServiceEvent se = new ServiceEvent(action, resource); - map.put(System.currentTimeMillis(), se); - - } - - @Override - public void onClose(KubernetesClientException cause) { - if (cause != null) { - LOG.error(cause.getMessage(), cause); - } - } - }); - } - } - } - - @Override - protected void doStop() throws Exception { - super.doStop(); - map.clear(); - } - - @Override - protected int poll() throws Exception { - int mapSize = map.size(); - for (ConcurrentMap.Entry<Long, ServiceEvent> entry : map.entrySet()) { - ServiceEvent serviceEvent = (ServiceEvent) entry.getValue(); - Exchange e = getEndpoint().createExchange(); - e.getIn().setBody(serviceEvent.getService()); - e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, serviceEvent.getAction()); - e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, entry.getKey()); - getProcessor().process(e); - map.remove(entry.getKey()); - } - return mapSize; - } + private static final Logger LOG = LoggerFactory.getLogger(KubernetesServicesConsumer.class); + + private ConcurrentMap<Long, ServiceEvent> map; + + public KubernetesServicesConsumer(KubernetesEndpoint endpoint, Processor processor) { + super(endpoint, processor); + } + + @Override + public KubernetesEndpoint getEndpoint() { + return (KubernetesEndpoint) super.getEndpoint(); + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + map = new ConcurrentHashMap<Long, ServiceEvent>(); + + if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken())) { + if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespaceName())) { + getEndpoint().getKubernetesClient().services() + .inNamespace(getEndpoint().getKubernetesConfiguration().getNamespaceName()) + .watch(new Watcher<Service>() { + + @Override + public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, + Service resource) { + ServiceEvent se = new ServiceEvent(action, resource); + map.put(System.currentTimeMillis(), se); + + } + + @Override + public void onClose(KubernetesClientException cause) { + if (cause != null) { + LOG.error(cause.getMessage(), cause); + } + } + + }); + } else { + getEndpoint().getKubernetesClient().services().watch(new Watcher<Service>() { + + @Override + public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, Service resource) { + ServiceEvent se = new ServiceEvent(action, resource); + map.put(System.currentTimeMillis(), se); + + } + + @Override + public void onClose(KubernetesClientException cause) { + if (cause != null) { + LOG.error(cause.getMessage(), cause); + } + } + }); + } + } + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + map.clear(); + } + + @Override + protected int poll() throws Exception { + int mapSize = map.size(); + for (ConcurrentMap.Entry<Long, ServiceEvent> entry : map.entrySet()) { + ServiceEvent serviceEvent = (ServiceEvent) entry.getValue(); + Exchange e = getEndpoint().createExchange(); + e.getIn().setBody(serviceEvent.getService()); + e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, serviceEvent.getAction()); + e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, entry.getKey()); + getProcessor().process(e); + map.remove(entry.getKey()); + } + return mapSize; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/6cefe5ee/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/PodEvent.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/PodEvent.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/PodEvent.java index 7b87b48..3d0ff43 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/PodEvent.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/PodEvent.java @@ -1,3 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.camel.component.kubernetes.consumer.common; import io.fabric8.kubernetes.api.model.Pod; @@ -5,28 +21,28 @@ import io.fabric8.kubernetes.client.Watcher.Action; public class PodEvent { private io.fabric8.kubernetes.client.Watcher.Action action; - + private Pod pod; - public PodEvent(Action action, Pod pod) { - super(); - this.action = action; - this.pod = pod; - } + public PodEvent(Action action, Pod pod) { + super(); + this.action = action; + this.pod = pod; + } - public io.fabric8.kubernetes.client.Watcher.Action getAction() { - return action; - } + public io.fabric8.kubernetes.client.Watcher.Action getAction() { + return action; + } - public void setAction(io.fabric8.kubernetes.client.Watcher.Action action) { - this.action = action; - } + public void setAction(io.fabric8.kubernetes.client.Watcher.Action action) { + this.action = action; + } - public Pod getPod() { - return pod; - } + public Pod getPod() { + return pod; + } - public void setPod(Pod pod) { - this.pod = pod; - } + public void setPod(Pod pod) { + this.pod = pod; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/6cefe5ee/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/ReplicationControllerEvent.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/ReplicationControllerEvent.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/ReplicationControllerEvent.java index d33c945..8a688e5 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/ReplicationControllerEvent.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/ReplicationControllerEvent.java @@ -1,3 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.camel.component.kubernetes.consumer.common; import io.fabric8.kubernetes.api.model.ReplicationController; @@ -5,28 +21,28 @@ import io.fabric8.kubernetes.client.Watcher.Action; public class ReplicationControllerEvent { private io.fabric8.kubernetes.client.Watcher.Action action; - + private ReplicationController replicationController; - public ReplicationControllerEvent(Action action, ReplicationController rc) { - super(); - this.action = action; - this.replicationController = rc; - } + public ReplicationControllerEvent(Action action, ReplicationController rc) { + super(); + this.action = action; + this.replicationController = rc; + } - public io.fabric8.kubernetes.client.Watcher.Action getAction() { - return action; - } + public io.fabric8.kubernetes.client.Watcher.Action getAction() { + return action; + } - public void setAction(io.fabric8.kubernetes.client.Watcher.Action action) { - this.action = action; - } + public void setAction(io.fabric8.kubernetes.client.Watcher.Action action) { + this.action = action; + } - public ReplicationController getReplicationController() { - return replicationController; - } + public ReplicationController getReplicationController() { + return replicationController; + } - public void setReplicationController(ReplicationController replicationController) { - this.replicationController = replicationController; - } + public void setReplicationController(ReplicationController replicationController) { + this.replicationController = replicationController; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/6cefe5ee/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/SecretEvent.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/SecretEvent.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/SecretEvent.java index 048a478..ea25017 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/SecretEvent.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/SecretEvent.java @@ -1,3 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.camel.component.kubernetes.consumer.common; import io.fabric8.kubernetes.api.model.Secret; @@ -5,28 +21,28 @@ import io.fabric8.kubernetes.client.Watcher.Action; public class SecretEvent { private io.fabric8.kubernetes.client.Watcher.Action action; - + private Secret secret; - public SecretEvent(Action action, Secret secret) { - super(); - this.action = action; - this.secret = secret; - } + public SecretEvent(Action action, Secret secret) { + super(); + this.action = action; + this.secret = secret; + } - public io.fabric8.kubernetes.client.Watcher.Action getAction() { - return action; - } + public io.fabric8.kubernetes.client.Watcher.Action getAction() { + return action; + } - public void setAction(io.fabric8.kubernetes.client.Watcher.Action action) { - this.action = action; - } + public void setAction(io.fabric8.kubernetes.client.Watcher.Action action) { + this.action = action; + } - public Secret getSecret() { - return secret; - } + public Secret getSecret() { + return secret; + } - public void setSecret(Secret secret) { - this.secret = secret; - } + public void setSecret(Secret secret) { + this.secret = secret; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/6cefe5ee/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/ServiceEvent.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/ServiceEvent.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/ServiceEvent.java index 199ee44..5ca7324 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/ServiceEvent.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/ServiceEvent.java @@ -1,3 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.camel.component.kubernetes.consumer.common; import io.fabric8.kubernetes.api.model.Service; @@ -5,28 +21,28 @@ import io.fabric8.kubernetes.client.Watcher.Action; public class ServiceEvent { private io.fabric8.kubernetes.client.Watcher.Action action; - + private Service service; - public ServiceEvent(Action action, Service service) { - super(); - this.action = action; - this.service = service; - } + public ServiceEvent(Action action, Service service) { + super(); + this.action = action; + this.service = service; + } - public io.fabric8.kubernetes.client.Watcher.Action getAction() { - return action; - } + public io.fabric8.kubernetes.client.Watcher.Action getAction() { + return action; + } - public void setAction(io.fabric8.kubernetes.client.Watcher.Action action) { - this.action = action; - } + public void setAction(io.fabric8.kubernetes.client.Watcher.Action action) { + this.action = action; + } - public Service getService() { - return service; - } + public Service getService() { + return service; + } - public void setService(Service service) { - this.service = service; - } + public void setService(Service service) { + this.service = service; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/6cefe5ee/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/producer/KubernetesBuildConfigsProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/producer/KubernetesBuildConfigsProducer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/producer/KubernetesBuildConfigsProducer.java index e713aa7..8f42efb 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/producer/KubernetesBuildConfigsProducer.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/producer/KubernetesBuildConfigsProducer.java @@ -36,8 +36,7 @@ import org.slf4j.LoggerFactory; public class KubernetesBuildConfigsProducer extends DefaultProducer { - private static final Logger LOG = LoggerFactory - .getLogger(KubernetesBuildConfigsProducer.class); + private static final Logger LOG = LoggerFactory.getLogger(KubernetesBuildConfigsProducer.class); public KubernetesBuildConfigsProducer(KubernetesEndpoint endpoint) { super(endpoint); @@ -52,13 +51,10 @@ public class KubernetesBuildConfigsProducer extends DefaultProducer { public void process(Exchange exchange) throws Exception { String operation; - if (ObjectHelper.isEmpty(getEndpoint().getKubernetesConfiguration() - .getOperation())) { - operation = exchange.getIn().getHeader( - KubernetesConstants.KUBERNETES_OPERATION, String.class); + if (ObjectHelper.isEmpty(getEndpoint().getKubernetesConfiguration().getOperation())) { + operation = exchange.getIn().getHeader(KubernetesConstants.KUBERNETES_OPERATION, String.class); } else { - operation = getEndpoint().getKubernetesConfiguration() - .getOperation(); + operation = getEndpoint().getKubernetesConfiguration().getOperation(); } switch (operation) { @@ -68,7 +64,7 @@ public class KubernetesBuildConfigsProducer extends DefaultProducer { break; case KubernetesOperations.LIST_BUILD_CONFIGS_BY_LABELS_OPERATION: - doListBuildConfigsByLabels(exchange, operation); + doListBuildConfigsByLabels(exchange, operation); break; case KubernetesOperations.GET_BUILD_CONFIG_OPERATION: @@ -76,61 +72,52 @@ public class KubernetesBuildConfigsProducer extends DefaultProducer { break; default: - throw new IllegalArgumentException("Unsupported operation " - + operation); + throw new IllegalArgumentException("Unsupported operation " + operation); } } protected void doList(Exchange exchange, String operation) throws Exception { - BuildConfigList buildConfigsList = getEndpoint() - .getKubernetesClient().adapt(OpenShiftClient.class).buildConfigs().list(); + BuildConfigList buildConfigsList = getEndpoint().getKubernetesClient().adapt(OpenShiftClient.class) + .buildConfigs().list(); exchange.getOut().setBody(buildConfigsList.getItems()); } - protected void doListBuildConfigsByLabels(Exchange exchange, - String operation) throws Exception { - BuildConfigList buildConfigsList = null; - Map<String, String> labels = exchange.getIn().getHeader( - KubernetesConstants.KUBERNETES_BUILD_CONFIGS_LABELS, + protected void doListBuildConfigsByLabels(Exchange exchange, String operation) throws Exception { + BuildConfigList buildConfigsList = null; + Map<String, String> labels = exchange.getIn().getHeader(KubernetesConstants.KUBERNETES_BUILD_CONFIGS_LABELS, Map.class); - String namespaceName = exchange.getIn().getHeader( - KubernetesConstants.KUBERNETES_NAMESPACE_NAME, String.class); + String namespaceName = exchange.getIn().getHeader(KubernetesConstants.KUBERNETES_NAMESPACE_NAME, String.class); if (!ObjectHelper.isEmpty(namespaceName)) { ClientNonNamespaceOperation<OpenShiftClient, BuildConfig, BuildConfigList, DoneableBuildConfig, ClientBuildConfigResource<BuildConfig, DoneableBuildConfig, Void, Void>> buildConfigs; buildConfigs = getEndpoint().getKubernetesClient().adapt(OpenShiftClient.class).buildConfigs() .inNamespace(namespaceName); for (Map.Entry<String, String> entry : labels.entrySet()) { - buildConfigs.withLabel(entry.getKey(), entry.getValue()); + buildConfigs.withLabel(entry.getKey(), entry.getValue()); } buildConfigsList = buildConfigs.list(); } else { ClientOperation<OpenShiftClient, BuildConfig, BuildConfigList, DoneableBuildConfig, ClientBuildConfigResource<BuildConfig, DoneableBuildConfig, Void, Void>> buildConfigs; buildConfigs = getEndpoint().getKubernetesClient().adapt(OpenShiftClient.class).buildConfigs(); for (Map.Entry<String, String> entry : labels.entrySet()) { - buildConfigs.withLabel(entry.getKey(), entry.getValue()); + buildConfigs.withLabel(entry.getKey(), entry.getValue()); } buildConfigsList = buildConfigs.list(); } exchange.getOut().setBody(buildConfigsList.getItems()); } - protected void doGetBuildConfig(Exchange exchange, String operation) - throws Exception { + protected void doGetBuildConfig(Exchange exchange, String operation) throws Exception { BuildConfig buildConfig = null; - String buildConfigName = exchange.getIn().getHeader( - KubernetesConstants.KUBERNETES_BUILD_CONFIG_NAME, + String buildConfigName = exchange.getIn().getHeader(KubernetesConstants.KUBERNETES_BUILD_CONFIG_NAME, String.class); - String namespaceName = exchange.getIn().getHeader( - KubernetesConstants.KUBERNETES_NAMESPACE_NAME, String.class); + String namespaceName = exchange.getIn().getHeader(KubernetesConstants.KUBERNETES_NAMESPACE_NAME, String.class); if (ObjectHelper.isEmpty(buildConfigName)) { LOG.error("Get a specific Build Config require specify a Build Config name"); - throw new IllegalArgumentException( - "Get a specific Build Config require specify a Build Config name"); + throw new IllegalArgumentException("Get a specific Build Config require specify a Build Config name"); } if (ObjectHelper.isEmpty(namespaceName)) { LOG.error("Get a specific Build Config require specify a namespace name"); - throw new IllegalArgumentException( - "Get a specific Build Config require specify a namespace name"); + throw new IllegalArgumentException("Get a specific Build Config require specify a namespace name"); } buildConfig = getEndpoint().getKubernetesClient().adapt(OpenShiftClient.class).buildConfigs() .inNamespace(namespaceName).withName(buildConfigName).get(); http://git-wip-us.apache.org/repos/asf/camel/blob/6cefe5ee/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/producer/KubernetesBuildsProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/producer/KubernetesBuildsProducer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/producer/KubernetesBuildsProducer.java index 41ac943..bc4ef6c 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/producer/KubernetesBuildsProducer.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/producer/KubernetesBuildsProducer.java @@ -36,8 +36,7 @@ import org.slf4j.LoggerFactory; public class KubernetesBuildsProducer extends DefaultProducer { - private static final Logger LOG = LoggerFactory - .getLogger(KubernetesBuildsProducer.class); + private static final Logger LOG = LoggerFactory.getLogger(KubernetesBuildsProducer.class); public KubernetesBuildsProducer(KubernetesEndpoint endpoint) { super(endpoint); @@ -52,13 +51,10 @@ public class KubernetesBuildsProducer extends DefaultProducer { public void process(Exchange exchange) throws Exception { String operation; - if (ObjectHelper.isEmpty(getEndpoint().getKubernetesConfiguration() - .getOperation())) { - operation = exchange.getIn().getHeader( - KubernetesConstants.KUBERNETES_OPERATION, String.class); + if (ObjectHelper.isEmpty(getEndpoint().getKubernetesConfiguration().getOperation())) { + operation = exchange.getIn().getHeader(KubernetesConstants.KUBERNETES_OPERATION, String.class); } else { - operation = getEndpoint().getKubernetesConfiguration() - .getOperation(); + operation = getEndpoint().getKubernetesConfiguration().getOperation(); } switch (operation) { @@ -68,7 +64,7 @@ public class KubernetesBuildsProducer extends DefaultProducer { break; case KubernetesOperations.LIST_BUILD_BY_LABELS_OPERATION: - doListBuildByLabels(exchange, operation); + doListBuildByLabels(exchange, operation); break; case KubernetesOperations.GET_BUILD_OPERATION: @@ -76,64 +72,53 @@ public class KubernetesBuildsProducer extends DefaultProducer { break; default: - throw new IllegalArgumentException("Unsupported operation " - + operation); + throw new IllegalArgumentException("Unsupported operation " + operation); } } protected void doList(Exchange exchange, String operation) throws Exception { - BuildList buildList = getEndpoint() - .getKubernetesClient().adapt(OpenShiftClient.class).builds().list(); + BuildList buildList = getEndpoint().getKubernetesClient().adapt(OpenShiftClient.class).builds().list(); exchange.getOut().setBody(buildList.getItems()); } - protected void doListBuildByLabels(Exchange exchange, - String operation) throws Exception { - BuildList buildList = null; - Map<String, String> labels = exchange.getIn().getHeader( - KubernetesConstants.KUBERNETES_BUILDS_LABELS, + protected void doListBuildByLabels(Exchange exchange, String operation) throws Exception { + BuildList buildList = null; + Map<String, String> labels = exchange.getIn().getHeader(KubernetesConstants.KUBERNETES_BUILDS_LABELS, Map.class); - String namespaceName = exchange.getIn().getHeader( - KubernetesConstants.KUBERNETES_NAMESPACE_NAME, String.class); + String namespaceName = exchange.getIn().getHeader(KubernetesConstants.KUBERNETES_NAMESPACE_NAME, String.class); if (!ObjectHelper.isEmpty(namespaceName)) { ClientNonNamespaceOperation<OpenShiftClient, Build, BuildList, DoneableBuild, ClientResource<Build, DoneableBuild>> builds; builds = getEndpoint().getKubernetesClient().adapt(OpenShiftClient.class).builds() .inNamespace(namespaceName); for (Map.Entry<String, String> entry : labels.entrySet()) { - builds.withLabel(entry.getKey(), entry.getValue()); + builds.withLabel(entry.getKey(), entry.getValue()); } buildList = builds.list(); } else { ClientOperation<OpenShiftClient, Build, BuildList, DoneableBuild, ClientResource<Build, DoneableBuild>> builds; builds = getEndpoint().getKubernetesClient().adapt(OpenShiftClient.class).builds(); for (Map.Entry<String, String> entry : labels.entrySet()) { - builds.withLabel(entry.getKey(), entry.getValue()); + builds.withLabel(entry.getKey(), entry.getValue()); } buildList = builds.list(); } exchange.getOut().setBody(buildList.getItems()); } - protected void doGetBuild(Exchange exchange, String operation) - throws Exception { + protected void doGetBuild(Exchange exchange, String operation) throws Exception { Build build = null; - String buildName = exchange.getIn().getHeader( - KubernetesConstants.KUBERNETES_BUILD_NAME, - String.class); - String namespaceName = exchange.getIn().getHeader( - KubernetesConstants.KUBERNETES_NAMESPACE_NAME, String.class); + String buildName = exchange.getIn().getHeader(KubernetesConstants.KUBERNETES_BUILD_NAME, String.class); + String namespaceName = exchange.getIn().getHeader(KubernetesConstants.KUBERNETES_NAMESPACE_NAME, String.class); if (ObjectHelper.isEmpty(buildName)) { LOG.error("Get a specific Build require specify a Build name"); - throw new IllegalArgumentException( - "Get a specific Build require specify a Build name"); + throw new IllegalArgumentException("Get a specific Build require specify a Build name"); } if (ObjectHelper.isEmpty(namespaceName)) { LOG.error("Get a specific Build require specify a namespace name"); - throw new IllegalArgumentException( - "Get a specific Build require specify a namespace name"); + throw new IllegalArgumentException("Get a specific Build require specify a namespace name"); } - build = getEndpoint().getKubernetesClient().adapt(OpenShiftClient.class).builds() - .inNamespace(namespaceName).withName(buildName).get(); + build = getEndpoint().getKubernetesClient().adapt(OpenShiftClient.class).builds().inNamespace(namespaceName) + .withName(buildName).get(); exchange.getOut().setBody(build); } } http://git-wip-us.apache.org/repos/asf/camel/blob/6cefe5ee/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/producer/KubernetesNodesProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/producer/KubernetesNodesProducer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/producer/KubernetesNodesProducer.java index f49aee8..f6114c3 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/producer/KubernetesNodesProducer.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/producer/KubernetesNodesProducer.java @@ -35,8 +35,7 @@ import org.slf4j.LoggerFactory; public class KubernetesNodesProducer extends DefaultProducer { - private static final Logger LOG = LoggerFactory - .getLogger(KubernetesNodesProducer.class); + private static final Logger LOG = LoggerFactory.getLogger(KubernetesNodesProducer.class); public KubernetesNodesProducer(KubernetesEndpoint endpoint) { super(endpoint); @@ -51,13 +50,10 @@ public class KubernetesNodesProducer extends DefaultProducer { public void process(Exchange exchange) throws Exception { String operation; - if (ObjectHelper.isEmpty(getEndpoint().getKubernetesConfiguration() - .getOperation())) { - operation = exchange.getIn().getHeader( - KubernetesConstants.KUBERNETES_OPERATION, String.class); + if (ObjectHelper.isEmpty(getEndpoint().getKubernetesConfiguration().getOperation())) { + operation = exchange.getIn().getHeader(KubernetesConstants.KUBERNETES_OPERATION, String.class); } else { - operation = getEndpoint().getKubernetesConfiguration() - .getOperation(); + operation = getEndpoint().getKubernetesConfiguration().getOperation(); } switch (operation) { @@ -75,45 +71,36 @@ public class KubernetesNodesProducer extends DefaultProducer { break; default: - throw new IllegalArgumentException("Unsupported operation " - + operation); + throw new IllegalArgumentException("Unsupported operation " + operation); } } protected void doList(Exchange exchange, String operation) throws Exception { - NodeList nodeList = getEndpoint() - .getKubernetesClient().nodes().list(); + NodeList nodeList = getEndpoint().getKubernetesClient().nodes().list(); exchange.getOut().setBody(nodeList.getItems()); } - protected void doListNodesByLabels(Exchange exchange, - String operation) throws Exception { + protected void doListNodesByLabels(Exchange exchange, String operation) throws Exception { NodeList nodeList = null; - Map<String, String> labels = exchange.getIn().getHeader( - KubernetesConstants.KUBERNETES_NODES_LABELS, - Map.class); - ClientNonNamespaceOperation<KubernetesClient, Node, NodeList, DoneableNode, ClientResource<Node, DoneableNode>> nodes; - nodes = getEndpoint().getKubernetesClient().nodes(); - for (Map.Entry<String, String> entry : labels.entrySet()) { - nodes.withLabel(entry.getKey(), entry.getValue()); - } - nodeList = nodes.list(); + Map<String, String> labels = exchange.getIn().getHeader(KubernetesConstants.KUBERNETES_NODES_LABELS, Map.class); + ClientNonNamespaceOperation<KubernetesClient, Node, NodeList, DoneableNode, ClientResource<Node, DoneableNode>> nodes; + nodes = getEndpoint().getKubernetesClient().nodes(); + for (Map.Entry<String, String> entry : labels.entrySet()) { + nodes.withLabel(entry.getKey(), entry.getValue()); + } + nodeList = nodes.list(); exchange.getOut().setBody(nodeList.getItems()); } - protected void doGetNode(Exchange exchange, String operation) - throws Exception { + protected void doGetNode(Exchange exchange, String operation) throws Exception { Node node = null; - String pvName = exchange.getIn().getHeader( - KubernetesConstants.KUBERNETES_NODE_NAME, - String.class); + String pvName = exchange.getIn().getHeader(KubernetesConstants.KUBERNETES_NODE_NAME, String.class); if (ObjectHelper.isEmpty(pvName)) { LOG.error("Get a specific Node require specify a Node name"); - throw new IllegalArgumentException( - "Get a specific Node require specify a Node name"); + throw new IllegalArgumentException("Get a specific Node require specify a Node name"); } node = getEndpoint().getKubernetesClient().nodes().withName(pvName).get(); - + exchange.getOut().setBody(node); } } http://git-wip-us.apache.org/repos/asf/camel/blob/6cefe5ee/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/KubernetesTestSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/KubernetesTestSupport.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/KubernetesTestSupport.java index 79ed5e8..7af2938 100644 --- a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/KubernetesTestSupport.java +++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/KubernetesTestSupport.java @@ -18,7 +18,7 @@ package org.apache.camel.component.kubernetes; import org.apache.camel.test.junit4.CamelTestSupport; -public class KubernetesTestSupport extends CamelTestSupport{ +public class KubernetesTestSupport extends CamelTestSupport { protected String authToken; protected String host; http://git-wip-us.apache.org/repos/asf/camel/blob/6cefe5ee/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesPodsConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesPodsConsumerTest.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesPodsConsumerTest.java index 08d1929..e2e2f36 100644 --- a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesPodsConsumerTest.java +++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesPodsConsumerTest.java @@ -16,20 +16,15 @@ */ package org.apache.camel.component.kubernetes.consumer; -import io.fabric8.kubernetes.api.model.Container; -import io.fabric8.kubernetes.api.model.ContainerPort; -import io.fabric8.kubernetes.api.model.Pod; -import io.fabric8.kubernetes.api.model.PodSpec; -import io.fabric8.kubernetes.client.Config; -import io.fabric8.kubernetes.client.ConfigBuilder; -import io.fabric8.kubernetes.client.DefaultKubernetesClient; - import java.util.ArrayList; -import java.util.Enumeration; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Properties; + +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerPort; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodSpec; import org.apache.camel.EndpointInject; import org.apache.camel.Exchange; @@ -39,37 +34,32 @@ import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.kubernetes.KubernetesConstants; import org.apache.camel.component.kubernetes.KubernetesTestSupport; import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.test.junit4.CamelTestSupport; import org.apache.camel.util.ObjectHelper; import org.junit.Test; public class KubernetesPodsConsumerTest extends KubernetesTestSupport { - + @EndpointInject(uri = "mock:result") protected MockEndpoint mockResultEndpoint; - @Test public void createAndDeletePod() throws Exception { if (ObjectHelper.isEmpty(authToken)) { return; } - + mockResultEndpoint.expectedMessageCount(3); - mockResultEndpoint.expectedHeaderValuesReceivedInAnyOrder(KubernetesConstants.KUBERNETES_EVENT_ACTION, "ADDED", "MODIFIED", "DELETED"); + mockResultEndpoint.expectedHeaderValuesReceivedInAnyOrder(KubernetesConstants.KUBERNETES_EVENT_ACTION, "ADDED", + "MODIFIED", "DELETED"); Exchange ex = template.request("direct:createPod", new Processor() { @Override public void process(Exchange exchange) throws Exception { - exchange.getIn().setHeader( - KubernetesConstants.KUBERNETES_NAMESPACE_NAME, - "default"); - exchange.getIn().setHeader( - KubernetesConstants.KUBERNETES_POD_NAME, "test"); + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_NAMESPACE_NAME, "default"); + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_POD_NAME, "test"); Map<String, String> labels = new HashMap<String, String>(); labels.put("this", "rocks"); - exchange.getIn().setHeader( - KubernetesConstants.KUBERNETES_PODS_LABELS, labels); + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_PODS_LABELS, labels); PodSpec podSpec = new PodSpec(); podSpec.setHost("172.28.128.4"); Container cont = new Container(); @@ -91,8 +81,7 @@ public class KubernetesPodsConsumerTest extends KubernetesTestSupport { podSpec.setContainers(list); - exchange.getIn().setHeader( - KubernetesConstants.KUBERNETES_POD_SPEC, podSpec); + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_POD_SPEC, podSpec); } }); @@ -104,19 +93,16 @@ public class KubernetesPodsConsumerTest extends KubernetesTestSupport { @Override public void process(Exchange exchange) throws Exception { - exchange.getIn().setHeader( - KubernetesConstants.KUBERNETES_NAMESPACE_NAME, - "default"); - exchange.getIn().setHeader( - KubernetesConstants.KUBERNETES_POD_NAME, "test"); + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_NAMESPACE_NAME, "default"); + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_POD_NAME, "test"); } }); boolean podDeleted = ex.getOut().getBody(Boolean.class); assertTrue(podDeleted); - - Thread.sleep(1*1000); + + Thread.sleep(1 * 1000); mockResultEndpoint.assertIsSatisfied(); } @@ -126,33 +112,28 @@ public class KubernetesPodsConsumerTest extends KubernetesTestSupport { return new RouteBuilder() { @Override public void configure() throws Exception { - from("direct:list") - .toF("kubernetes://%s?oauthToken=%s&category=pods&operation=listPods", - host, authToken); + from("direct:list").toF("kubernetes://%s?oauthToken=%s&category=pods&operation=listPods", host, + authToken); from("direct:listByLabels") - .toF("kubernetes://%s?oauthToken=%s&category=pods&operation=listPodsByLabels", - host, authToken); - from("direct:getPod") - .toF("kubernetes://%s?oauthToken=%s&category=pods&operation=getPod", - host, authToken); - from("direct:createPod") - .toF("kubernetes://%s?oauthToken=%s&category=pods&operation=createPod", - host, authToken); - from("direct:deletePod") - .toF("kubernetes://%s?oauthToken=%s&category=pods&operation=deletePod", - host, authToken); + .toF("kubernetes://%s?oauthToken=%s&category=pods&operation=listPodsByLabels", host, authToken); + from("direct:getPod").toF("kubernetes://%s?oauthToken=%s&category=pods&operation=getPod", host, + authToken); + from("direct:createPod").toF("kubernetes://%s?oauthToken=%s&category=pods&operation=createPod", host, + authToken); + from("direct:deletePod").toF("kubernetes://%s?oauthToken=%s&category=pods&operation=deletePod", host, + authToken); fromF("kubernetes://%s?oauthToken=%s&category=pods", host, authToken) - .process(new KubernertesProcessor()) - .to(mockResultEndpoint); + .process(new KubernertesProcessor()).to(mockResultEndpoint); } }; } - + public class KubernertesProcessor implements Processor { @Override public void process(Exchange exchange) throws Exception { Message in = exchange.getIn(); - log.info("Got event with body: " + in.getBody() + " and action " + in.getHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION)); + log.info("Got event with body: " + in.getBody() + " and action " + + in.getHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION)); } } }