This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 57fc601efbbd27d70231582dddb277a9bd6eeed6 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Mon Sep 16 08:49:26 2019 +0200 CAMEL-13978 - Create ConfigMap Watch feature in Kubernetes Component --- .../config_maps/KubernetesConfigMapsConsumer.java | 132 +++++++++++++++++++++ .../config_maps/KubernetesConfigMapsEndpoint.java | 2 +- .../kubernetes/consumer/common/ConfigMapEvent.java | 47 ++++++++ 3 files changed, 180 insertions(+), 1 deletion(-) diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/config_maps/KubernetesConfigMapsConsumer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/config_maps/KubernetesConfigMapsConsumer.java new file mode 100644 index 0000000..bbc489e --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/config_maps/KubernetesConfigMapsConsumer.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.config_maps; + +import java.util.concurrent.ExecutorService; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.kubernetes.AbstractKubernetesEndpoint; +import org.apache.camel.component.kubernetes.KubernetesConstants; +import org.apache.camel.component.kubernetes.consumer.common.ConfigMapEvent; +import org.apache.camel.support.DefaultConsumer; +import org.apache.camel.util.ObjectHelper; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.ConfigMapList; +import io.fabric8.kubernetes.api.model.DoneableConfigMap; +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.Watch; +import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation; +import io.fabric8.kubernetes.client.dsl.Resource; + +public class KubernetesConfigMapsConsumer extends DefaultConsumer { + + private final Processor processor; + private ExecutorService executor; + private ConfigMapsConsumerTask configMapWatcher; + + public KubernetesConfigMapsConsumer(AbstractKubernetesEndpoint endpoint, Processor processor) { + super(endpoint, processor); + this.processor = processor; + } + + @Override + public AbstractKubernetesEndpoint getEndpoint() { + return (AbstractKubernetesEndpoint)super.getEndpoint(); + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + executor = getEndpoint().createExecutor(); + + configMapWatcher = new ConfigMapsConsumerTask(); + executor.submit(configMapWatcher); + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + + log.debug("Stopping Kubernetes ConfigMap Consumer"); + if (executor != null) { + if (getEndpoint() != null && getEndpoint().getCamelContext() != null) { + if (configMapWatcher != null) { + configMapWatcher.getWatch().close(); + } + getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor); + } else { + if (configMapWatcher != null) { + configMapWatcher.getWatch().close(); + } + executor.shutdownNow(); + } + } + executor = null; + } + + class ConfigMapsConsumerTask implements Runnable { + + private Watch watch; + + @Override + public void run() { + NonNamespaceOperation<ConfigMap, ConfigMapList, DoneableConfigMap, Resource<ConfigMap, DoneableConfigMap>> w = getEndpoint().getKubernetesClient().configMaps(); + if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getLabelKey()) + && ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getLabelValue())) { + w.withLabel(getEndpoint().getKubernetesConfiguration().getLabelKey(), getEndpoint().getKubernetesConfiguration().getLabelValue()); + } + if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getResourceName())) { + w.withName(getEndpoint().getKubernetesConfiguration().getResourceName()); + } + watch = w.watch(new Watcher<ConfigMap>() { + + @Override + public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, ConfigMap resource) { + ConfigMapEvent de = new ConfigMapEvent(action, resource); + Exchange exchange = getEndpoint().createExchange(); + exchange.getIn().setBody(de.getConfigMap()); + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, de.getAction()); + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis()); + try { + processor.process(exchange); + } catch (Exception e) { + getExceptionHandler().handleException("Error during processing", exchange, e); + } + } + + @Override + public void onClose(KubernetesClientException cause) { + if (cause != null) { + log.error(cause.getMessage(), cause); + } + + } + }); + } + + public Watch getWatch() { + return watch; + } + + public void setWatch(Watch watch) { + this.watch = watch; + } + } +} diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/config_maps/KubernetesConfigMapsEndpoint.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/config_maps/KubernetesConfigMapsEndpoint.java index 616dbfc..26dc3c9 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/config_maps/KubernetesConfigMapsEndpoint.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/config_maps/KubernetesConfigMapsEndpoint.java @@ -43,7 +43,7 @@ public class KubernetesConfigMapsEndpoint extends AbstractKubernetesEndpoint { @Override public Consumer createConsumer(Processor processor) throws Exception { - throw new IllegalArgumentException("The kubernetes-configmaps doesn't support consumer"); + return new KubernetesConfigMapsConsumer(this, processor); } } diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/ConfigMapEvent.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/ConfigMapEvent.java new file mode 100644 index 0000000..d225219 --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/ConfigMapEvent.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.consumer.common; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.client.Watcher.Action; + +public class ConfigMapEvent { + private io.fabric8.kubernetes.client.Watcher.Action action; + + private ConfigMap configMap; + + public ConfigMapEvent(Action action, ConfigMap configMap) { + this.action = action; + this.configMap = configMap; + } + + public io.fabric8.kubernetes.client.Watcher.Action getAction() { + return action; + } + + public void setAction(io.fabric8.kubernetes.client.Watcher.Action action) { + this.action = action; + } + + public ConfigMap getConfigMap() { + return configMap; + } + + public void setConfigMap(ConfigMap configMap) { + this.configMap = configMap; + } +}