CAMEL-9888: Create a camel-consul component
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/38d5374a Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/38d5374a Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/38d5374a Branch: refs/heads/master Commit: 38d5374aaf945ba587d7fcc06de8bac2ef4e0a48 Parents: dba22f9 Author: lburgazzoli <lburgazz...@gmail.com> Authored: Mon May 23 17:15:34 2016 +0200 Committer: lburgazzoli <lburgazz...@gmail.com> Committed: Mon May 23 17:22:25 2016 +0200 ---------------------------------------------------------------------- apache-camel/pom.xml | 4 + .../src/main/descriptors/common-bin.xml | 1 + components/camel-consul/pom.xml | 120 ++++++++ .../consul/AbstractConsulConsumer.java | 110 +++++++ .../consul/AbstractConsulEndpoint.java | 94 ++++++ .../consul/AbstractConsulProducer.java | 129 ++++++++ .../camel/component/consul/ConsulComponent.java | 65 ++++ .../component/consul/ConsulConfiguration.java | 241 +++++++++++++++ .../camel/component/consul/ConsulConstants.java | 42 +++ .../component/consul/ConsulEndpointFactory.java | 25 ++ .../consul/enpoint/ConsulAgentActions.java | 25 ++ .../consul/enpoint/ConsulAgentEndpoint.java | 43 +++ .../consul/enpoint/ConsulAgentProducer.java | 33 ++ .../consul/enpoint/ConsulEventActions.java | 23 ++ .../consul/enpoint/ConsulEventConsumer.java | 131 ++++++++ .../consul/enpoint/ConsulEventEndpoint.java | 44 +++ .../consul/enpoint/ConsulEventProducer.java | 56 ++++ .../consul/enpoint/ConsulKeyValueActions.java | 30 ++ .../consul/enpoint/ConsulKeyValueConsumer.java | 128 ++++++++ .../consul/enpoint/ConsulKeyValueEndpoint.java | 43 +++ .../consul/enpoint/ConsulKeyValueProducer.java | 126 ++++++++ .../consul/policy/ConsulRoutePolicy.java | 308 +++++++++++++++++++ .../src/main/resources/META-INF/LICENSE.txt | 203 ++++++++++++ .../src/main/resources/META-INF/NOTICE.txt | 11 + .../services/org/apache/camel/component/consul | 1 + .../camel/component/consul/ConsulEventTest.java | 68 ++++ .../component/consul/ConsulEventWatchTest.java | 60 ++++ .../component/consul/ConsulKeyValueTest.java | 62 ++++ .../consul/ConsulKeyValueWatchTest.java | 66 ++++ .../component/consul/ConsulTestSupport.java | 64 ++++ .../src/test/resources/log4j.properties | 20 ++ components/pom.xml | 1 + parent/pom.xml | 7 + .../src/main/resources/bundles.properties | 1 + .../features/src/main/resources/features.xml | 6 + 35 files changed, 2391 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/apache-camel/pom.xml ---------------------------------------------------------------------- diff --git a/apache-camel/pom.xml b/apache-camel/pom.xml index 8f8c2f7..a556261 100644 --- a/apache-camel/pom.xml +++ b/apache-camel/pom.xml @@ -165,6 +165,10 @@ </dependency> <dependency> <groupId>org.apache.camel</groupId> + <artifactId>camel-consul</artifactId> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> <artifactId>camel-context</artifactId> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/apache-camel/src/main/descriptors/common-bin.xml ---------------------------------------------------------------------- diff --git a/apache-camel/src/main/descriptors/common-bin.xml b/apache-camel/src/main/descriptors/common-bin.xml index 3587254..5745702 100644 --- a/apache-camel/src/main/descriptors/common-bin.xml +++ b/apache-camel/src/main/descriptors/common-bin.xml @@ -54,6 +54,7 @@ <include>org.apache.camel:camel-core</include> <include>org.apache.camel:camel-core-osgi</include> <include>org.apache.camel:camel-cometd</include> + <include>org.apache.camel:camel-consul</include> <include>org.apache.camel:camel-context</include> <include>org.apache.camel:camel-couchdb</include> <include>org.apache.camel:camel-crypto</include> http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-consul/pom.xml b/components/camel-consul/pom.xml new file mode 100644 index 0000000..9b9dca9 --- /dev/null +++ b/components/camel-consul/pom.xml @@ -0,0 +1,120 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.camel</groupId> + <artifactId>components</artifactId> + <version>2.18-SNAPSHOT</version> + </parent> + + <artifactId>camel-consul</artifactId> + <packaging>jar</packaging> + <name>Camel :: Consul</name> + <description>Camel Consul support</description> + + <properties> + <camel.osgi.export.pkg> + org.apache.camel.component.consul.*, + </camel.osgi.export.pkg> + <camel.osgi.export.service> + org.apache.camel.spi.ComponentResolver;component=consul + </camel.osgi.export.service> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-core</artifactId> + </dependency> + <dependency> + <groupId>com.orbitz.consul</groupId> + <artifactId>consul-client</artifactId> + <version>${consul-client-version}</version> + </dependency> + + <!-- testing --> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>${mockito-version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>java-hamcrest</artifactId> + <version>${hamcrest-version}</version> + <scope>test</scope> + </dependency> + </dependencies> + + + <profiles> + <profile> + <id>consul-skip-tests</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <build> + <plugins> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <skipTests>true</skipTests> + </configuration> + </plugin> + </plugins> + </build> + </profile> + <profile> + <id>consul-tests</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <build> + <plugins> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <skipTests>false</skipTests> + </configuration> + </plugin> + </plugins> + </build> + </profile> + + </profiles> + +</project> http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulConsumer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulConsumer.java new file mode 100644 index 0000000..c3b6545 --- /dev/null +++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulConsumer.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.consul; + +import java.math.BigInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +import com.orbitz.consul.Consul; +import org.apache.camel.Processor; +import org.apache.camel.impl.DefaultConsumer; +import org.apache.camel.util.ObjectHelper; + +/** + * @author lburgazzoli + */ +public abstract class AbstractConsulConsumer<C> extends DefaultConsumer { + protected final AbstractConsulEndpoint endpoint; + protected final ConsulConfiguration configuration; + protected final String key; + protected final AtomicReference<BigInteger> index; + + private final Function<Consul, C> clientSupplier; + private Runnable watcher; + + protected AbstractConsulConsumer(AbstractConsulEndpoint endpoint, ConsulConfiguration configuration, Processor processor, Function<Consul, C> clientSupplier) { + super(endpoint, processor); + + this.endpoint = endpoint; + this.configuration = configuration; + this.key = ObjectHelper.notNull(configuration.getKey(), ConsulConstants.CONSUL_KEY); + this.index = new AtomicReference<>(BigInteger.valueOf(configuration.getFirstIndex())); + this.clientSupplier = clientSupplier; + this.watcher = null; + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + + watcher = createWatcher(clientSupplier.apply(endpoint.getConsul())); + watcher.run(); + } + + @Override + protected void doStop() throws Exception { + watcher = null; + + super.doStop(); + } + + // ************************************************************************* + // + // ************************************************************************* + + protected abstract Runnable createWatcher(C client) throws Exception; + + // ************************************************************************* + // Handlers + // ************************************************************************* + + protected abstract class AbstractWatcher implements Runnable { + private final C client; + + public AbstractWatcher(C client) { + this.client = client; + } + + protected void onError(Throwable throwable) { + if (isRunAllowed()) { + getExceptionHandler().handleException("Error watching for event " + key, throwable); + } + } + + protected final void setIndex(BigInteger responseIndex) { + index.set(responseIndex); + } + + @Override + public final void run() { + if (isRunAllowed()) { + watch(client); + } + } + + protected final C client() { + return client; + } + + protected final void watch() { + watch(client); + } + + protected abstract void watch(C client); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulEndpoint.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulEndpoint.java new file mode 100644 index 0000000..981c314 --- /dev/null +++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulEndpoint.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.consul; + +import com.orbitz.consul.Consul; +import org.apache.camel.impl.DefaultEndpoint; +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriPath; +import org.apache.camel.util.ObjectHelper; + +public abstract class AbstractConsulEndpoint extends DefaultEndpoint { + + @UriPath(description = "The consul configuration") + @Metadata(required = "true") + private final ConsulConfiguration configuration; + + @UriParam(description = "The API endpoint") + @Metadata(required = "true") + private final String apiEndpoint; + + private Consul consul; + + protected AbstractConsulEndpoint(String apiEndpoint, String uri, ConsulComponent component, ConsulConfiguration configuration) { + super(uri, component); + + this.configuration = configuration; + this.apiEndpoint = apiEndpoint; + } + + @Override + public boolean isSingleton() { + return true; + } + + // ************************************************************************* + // + // ************************************************************************* + + public ConsulConfiguration getConfiguration() { + return this.configuration; + } + + public String getApiEndpoint() { + return this.apiEndpoint; + } + + public synchronized Consul getConsul() throws Exception { + if (consul == null) { + Consul.Builder builder = Consul.builder(); + builder.withPing(configuration.isPingInstance()); + + if (ObjectHelper.isNotEmpty(configuration.getUrl())) { + builder.withUrl(configuration.getUrl()); + } + if (ObjectHelper.isNotEmpty(configuration.getSslContextParameters())) { + builder.withSslContext(configuration.getSslContextParameters().createSSLContext(getCamelContext())); + } + if (ObjectHelper.isNotEmpty(configuration.getAclToken())) { + builder.withAclToken(configuration.getAclToken()); + } + if (configuration.requiresBasicAuthentication()) { + builder.withBasicAuth(configuration.getUserName(), configuration.getPassword()); + } + if (ObjectHelper.isNotEmpty(configuration.getConnectTimeoutMillis())) { + builder.withConnectTimeoutMillis(configuration.getConnectTimeoutMillis()); + } + if (ObjectHelper.isNotEmpty(configuration.getReadTimeoutMillis())) { + builder.withReadTimeoutMillis(configuration.getReadTimeoutMillis()); + } + if (ObjectHelper.isNotEmpty(configuration.getWriteTimeoutMillis())) { + builder.withWriteTimeoutMillis(configuration.getWriteTimeoutMillis()); + } + + consul = builder.build(); + } + + return consul; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulProducer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulProducer.java new file mode 100644 index 0000000..2be260c --- /dev/null +++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulProducer.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.consul; + +import java.util.function.Function; + +import com.orbitz.consul.Consul; +import org.apache.camel.Message; +import org.apache.camel.NoSuchHeaderException; +import org.apache.camel.Processor; +import org.apache.camel.impl.HeaderSelectorProducer; + + +public abstract class AbstractConsulProducer<C> extends HeaderSelectorProducer { + private final AbstractConsulEndpoint endpoint; + private final ConsulConfiguration configuration; + private final Function<Consul, C> clientSupplier; + private C client; + + protected AbstractConsulProducer(AbstractConsulEndpoint endpoint, ConsulConfiguration configuration, Function<Consul, C> clientSupplier) { + super(endpoint, ConsulConstants.CONSUL_ACTION, configuration.getAction()); + + this.endpoint = endpoint; + this.configuration = configuration; + this.clientSupplier = clientSupplier; + this.client = null; + } + + // ************************************************************************* + // + // ************************************************************************* + + protected Consul getConsul() throws Exception { + return endpoint.getConsul(); + } + + protected C getClient() throws Exception { + if (client == null) { + client = clientSupplier.apply(getConsul()); + } + + return client; + } + + protected ConsulConfiguration getConfiguration() { + return configuration; + } + + protected <D> D getHeader(Message message, String header, D defaultValue, Class<D> type) { + return message.getHeader(header, defaultValue, type); + } + + protected <D> D getMandatoryHeader(Message message, String header, Class<D> type) throws Exception { + return getMandatoryHeader(message, header, null, type); + } + + protected <D> D getMandatoryHeader(Message message, String header, D defaultValue, Class<D> type) throws Exception { + D value = getHeader(message, header, defaultValue, type); + if (value == null) { + throw new NoSuchHeaderException(message.getExchange(), header, type); + } + + return value; + } + + protected String getKey(Message message) { + return message.getHeader( + ConsulConstants.CONSUL_KEY, + configuration.getKey(), + String.class); + } + + protected String getMandatoryKey(Message message) throws Exception { + return getMandatoryHeader( + message, + ConsulConstants.CONSUL_KEY, + configuration.getKey(), + String.class); + } + + protected <T> T getOption(Message message, T defaultValue, Class<T> type) { + return message.getHeader(ConsulConstants.CONSUL_OPTIONS, defaultValue, type); + } + + protected boolean isValueAsString(Message message) throws Exception { + return message.getHeader( + ConsulConstants.CONSUL_VALUE_AS_STRING, + configuration.isValueAsString(), + Boolean.class); + } + + protected <T> T getBody(Message message, T defaultValue, Class<T> type) throws Exception { + T body = message.getBody(type); + if (body == null) { + body = defaultValue; + } + + return body; + } + + protected void setBodyAndResult(Message message, Object body) throws Exception { + setBodyAndResult(message, body, body != null); + } + + protected void setBodyAndResult(Message message, Object body, boolean result) throws Exception { + message.setHeader(ConsulConstants.CONSUL_RESULT, result); + if (body != null) { + message.setBody(body); + } + } + + protected Processor wrap(Function<C, Object> supplier) { + return exchange -> setBodyAndResult(exchange.getIn(), supplier.apply(getClient())); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulComponent.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulComponent.java new file mode 100644 index 0000000..bbd6a1c --- /dev/null +++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulComponent.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.consul; + +import java.util.Map; + +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.component.consul.enpoint.ConsulAgentEndpoint; +import org.apache.camel.component.consul.enpoint.ConsulEventEndpoint; +import org.apache.camel.component.consul.enpoint.ConsulKeyValueEndpoint; +import org.apache.camel.impl.UriEndpointComponent; + +/** + * Represents the component that manages {@link AbstractConsulEndpoint}. + */ +public class ConsulComponent extends UriEndpointComponent { + + public ConsulComponent() { + super(AbstractConsulEndpoint.class); + } + + public ConsulComponent(CamelContext context) { + super(context, AbstractConsulEndpoint.class); + } + + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + ConsulConfiguration configuration = new ConsulConfiguration(); + setProperties(configuration, parameters); + + return ConsulApiEndpoint.valueOf(remaining).create(uri, this, configuration); + } + + private enum ConsulApiEndpoint implements ConsulEndpointFactory { + kv(ConsulKeyValueEndpoint::new), + event(ConsulEventEndpoint::new), + agent(ConsulAgentEndpoint::new); + + private final ConsulEndpointFactory factory; + + ConsulApiEndpoint(ConsulEndpointFactory factory) { + this.factory = factory; + } + + @Override + public Endpoint create(String uri, ConsulComponent component, ConsulConfiguration configuration) throws Exception { + return factory.create(uri, component, configuration); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConfiguration.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConfiguration.java new file mode 100644 index 0000000..2c0e576 --- /dev/null +++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConfiguration.java @@ -0,0 +1,241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.consul; + +import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriParams; +import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.jsse.SSLContextParameters; + +@UriParams +public class ConsulConfiguration { + @UriParam + private String url; + + @UriParam(label = "security") + private SSLContextParameters sslContextParameters; + @UriParam(label = "security") + private String aclToken; + @UriParam(label = "security") + private String userName; + @UriParam(label = "security") + private String password; + + @UriParam + private Long connectTimeoutMillis; + @UriParam + private Long readTimeoutMillis; + @UriParam + private Long writeTimeoutMillis; + @UriParam(defaultValue = "true") + private boolean pingInstance = true; + + + @UriParam(label = "producer") + private String action; + + @UriParam(label = "producer,kv", defaultValue = "false") + private boolean valueAsString; + + @UriParam + private String key; + + @UriParam(label = "consumer,watch", defaultValue = "10") + private Integer blockSeconds = 10; + + @UriParam(label = "consumer,watch", defaultValue = "0") + private long firstIndex; + + @UriParam(label = "consumer,watch", defaultValue = "false") + private boolean recursive; + + + public String getUrl() { + return url; + } + + /** + * The Consul agent URL + */ + public void setUrl(String url) { + this.url = url; + } + + public SSLContextParameters getSslContextParameters() { + return sslContextParameters; + } + + /** + * SSL configuration using an org.apache.camel.util.jsse.SSLContextParameters + * instance. + */ + public void setSslContextParameters(SSLContextParameters sslContextParameters) { + this.sslContextParameters = sslContextParameters; + } + + public String getAclToken() { + return aclToken; + } + + /** + * Sets the ACL token to be used with Consul + */ + public void setAclToken(String aclToken) { + this.aclToken = aclToken; + } + + public String getAction() { + return action; + } + + public String getUserName() { + return userName; + } + + /** + * Sets the username to be used for basic authentication + */ + public void setUserName(String userName) { + this.userName = userName; + } + + public String getPassword() { + return password; + } + + /** + * Sets the password to be used for basic authentication + */ + public void setPassword(String password) { + this.password = password; + } + + public boolean requiresBasicAuthentication() { + return ObjectHelper.isNotEmpty(userName) && ObjectHelper.isNotEmpty(password); + } + + public Long getConnectTimeoutMillis() { + return connectTimeoutMillis; + } + + /** + * Connect timeout for OkHttpClient + */ + public void setConnectTimeoutMillis(Long connectTimeoutMillis) { + this.connectTimeoutMillis = connectTimeoutMillis; + } + + public Long getReadTimeoutMillis() { + return readTimeoutMillis; + } + + /** + * Read timeout for OkHttpClient + */ + public void setReadTimeoutMillis(Long readTimeoutMillis) { + this.readTimeoutMillis = readTimeoutMillis; + } + + public Long getWriteTimeoutMillis() { + return writeTimeoutMillis; + } + + /** + * Write timeout for OkHttpClient + */ + public void setWritTeimeoutMillis(Long writeTimeoutMillis) { + this.writeTimeoutMillis = writeTimeoutMillis; + } + + public void setWriteTimeoutMillis(Long writeTimeoutMillis) { + this.writeTimeoutMillis = writeTimeoutMillis; + } + + public boolean isPingInstance() { + return pingInstance; + } + + /** + * Configure if the AgentClient should attempt a ping before returning the Consul instance + */ + public void setPingInstance(boolean pingInstance) { + this.pingInstance = pingInstance; + } + + /** + * The default action. Can be overridden by CamelConsulAction + */ + public void setAction(String action) { + this.action = action; + } + + public boolean isValueAsString() { + return valueAsString; + } + + /** + * Default to transform values retrieved from Consul i.e. on KV endpoint to + * string. + */ + public void setValueAsString(boolean valueAsString) { + this.valueAsString = valueAsString; + } + + public String getKey() { + return key; + } + + /** + * The default action. Can be overridden by CamelConsulKey + */ + public void setKey(String key) { + this.key = key; + } + + public Integer getBlockSeconds() { + return blockSeconds; + } + + /** + * The second to wait for a watch event, default 10 seconds + */ + public void setBlockSeconds(Integer blockSeconds) { + this.blockSeconds = blockSeconds; + } + + public long getFirstIndex() { + return firstIndex; + } + + /** + * The first index for watch for, default 0 + */ + public void setFirstIndex(long firstIndex) { + this.firstIndex = firstIndex; + } + + public boolean isRecursive() { + return recursive; + } + + /** + * Recursively watch, default false + */ + public void setRecursive(boolean recursive) { + this.recursive = recursive; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java new file mode 100644 index 0000000..923e1d7 --- /dev/null +++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.consul; + +public interface ConsulConstants { + String CONSUL_ENDPOINT_KV = "kv"; + + String CONSUL_ACTION = "CamelConsulAction"; + String CONSUL_KEY = "CamelConsulKey"; + String CONSUL_EVENT_ID = "CamelConsulEventId"; + String CONSUL_EVENT_NAME = "CamelConsulEventName"; + String CONSUL_EVENT_LTIME = "CamelConsulEventLTime"; + String CONSUL_NODE_FILTER = "CamelConsulNodeFilter"; + String CONSUL_TAG_FILTER = "CamelConsulTagFilter"; + String CONSUL_SERVICE_FILTER = "CamelConsulSessionFilter"; + String CONSUL_VERSION = "CamelConsulVersion"; + String CONSUL_VALUE = "CamelConsulValue"; + String CONSUL_VALUES = "CamelConsulValues"; + String CONSUL_FLAGS = "CamelConsulFlags"; + String CONSUL_CREATE_INDEX = "CamelConsulCreateIndex"; + String CONSUL_LOCK_INDEX = "CamelConsulCreateIndex"; + String CONSUL_MODIFY_INDEX = "CamelConsulModifyIndex"; + String CONSUL_OPTIONS = "CamelConsulOptions"; + String CONSUL_RESULT = "CamelConsulResult"; + String CONSUL_SESSION = "CamelConsulSession"; + String CONSUL_OPERATION = "CamelConsulOperation"; + String CONSUL_VALUE_AS_STRING = "CamelConsulValueAsString"; +} http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulEndpointFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulEndpointFactory.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulEndpointFactory.java new file mode 100644 index 0000000..3401ff8 --- /dev/null +++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulEndpointFactory.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.consul; + + +import org.apache.camel.Endpoint; + +@FunctionalInterface +public interface ConsulEndpointFactory { + Endpoint create(String uri, ConsulComponent component, ConsulConfiguration configuration) throws Exception; +} http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentActions.java ---------------------------------------------------------------------- diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentActions.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentActions.java new file mode 100644 index 0000000..aff15ab --- /dev/null +++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentActions.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.consul.enpoint; + +public interface ConsulAgentActions { + String CHECKS = "CHECKS"; + String SERVICES = "SERVICES"; + String MEMBERS = "MEMBERS"; + String AGENT = "AGENT"; +} http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentEndpoint.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentEndpoint.java new file mode 100644 index 0000000..9d5742d --- /dev/null +++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentEndpoint.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.consul.enpoint; + +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.component.consul.AbstractConsulEndpoint; +import org.apache.camel.component.consul.ConsulComponent; +import org.apache.camel.component.consul.ConsulConfiguration; +import org.apache.camel.spi.UriEndpoint; + +@UriEndpoint(scheme = "consul", title = "Consul Agent", syntax = "consul://agent", producerOnly = true, label = "api,cloud") +public class ConsulAgentEndpoint extends AbstractConsulEndpoint { + public ConsulAgentEndpoint(String uri, ConsulComponent component, ConsulConfiguration configuration) { + super("agent", uri, component, configuration); + } + + @Override + public Producer createProducer() throws Exception { + return new ConsulAgentProducer(this, getConfiguration()); + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + throw new IllegalArgumentException("Not implemented"); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentProducer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentProducer.java new file mode 100644 index 0000000..940095a --- /dev/null +++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentProducer.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.consul.enpoint; + +import com.orbitz.consul.AgentClient; +import org.apache.camel.component.consul.AbstractConsulEndpoint; +import org.apache.camel.component.consul.AbstractConsulProducer; +import org.apache.camel.component.consul.ConsulConfiguration; + +public class ConsulAgentProducer extends AbstractConsulProducer<AgentClient> { + ConsulAgentProducer(AbstractConsulEndpoint endpoint, ConsulConfiguration configuration) { + super(endpoint, configuration, c -> c.agentClient()); + + bind(ConsulAgentActions.CHECKS, wrap(c -> c.getChecks())); + bind(ConsulAgentActions.SERVICES, wrap(c -> c.getServices())); + bind(ConsulAgentActions.MEMBERS, wrap(c -> c.getMembers())); + bind(ConsulAgentActions.AGENT, wrap(c -> c.getAgent())); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventActions.java ---------------------------------------------------------------------- diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventActions.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventActions.java new file mode 100644 index 0000000..76e63ec --- /dev/null +++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventActions.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.consul.enpoint; + +public interface ConsulEventActions { + String FIRE = "FIRE"; + String LIST = "LIST"; +} http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventConsumer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventConsumer.java new file mode 100644 index 0000000..f6f658c --- /dev/null +++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventConsumer.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.consul.enpoint; + +import java.math.BigInteger; +import java.util.List; + +import com.orbitz.consul.EventClient; +import com.orbitz.consul.async.EventResponseCallback; +import com.orbitz.consul.model.EventResponse; +import com.orbitz.consul.model.event.Event; +import com.orbitz.consul.option.QueryOptions; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.component.consul.AbstractConsulConsumer; +import org.apache.camel.component.consul.ConsulConfiguration; +import org.apache.camel.component.consul.ConsulConstants; + +public class ConsulEventConsumer extends AbstractConsulConsumer<EventClient> { + protected ConsulEventConsumer(ConsulEventEndpoint endpoint, ConsulConfiguration configuration, Processor processor) { + super(endpoint, configuration, processor, c -> c.eventClient()); + } + + @Override + protected Runnable createWatcher(EventClient client) throws Exception { + return new EventWatcher(client); + } + + // ************************************************************************* + // Watch + // ************************************************************************* + + private class EventWatcher extends AbstractWatcher implements EventResponseCallback { + EventWatcher(EventClient client) { + super(client); + } + + @Override + public void watch(EventClient client) { + client.listEvents( + key, + QueryOptions.blockSeconds(configuration.getBlockSeconds(), index.get()).build(), + this + ); + } + + @Override + public void onComplete(EventResponse eventResponse) { + if (isRunAllowed()) { + List<Event> events = filterEvents(eventResponse.getEvents(), index.get()); + events.forEach(this::onEvent); + + setIndex(eventResponse.getIndex()); + + watch(); + } + } + + @Override + public void onFailure(Throwable throwable) { + onError(throwable); + } + + private void onEvent(Event event) { + final Exchange exchange = endpoint.createExchange(); + final Message message = exchange.getIn(); + + message.setHeader(ConsulConstants.CONSUL_KEY, key); + message.setHeader(ConsulConstants.CONSUL_RESULT, true); + message.setHeader(ConsulConstants.CONSUL_EVENT_ID, event.getId()); + message.setHeader(ConsulConstants.CONSUL_EVENT_NAME, event.getName()); + message.setHeader(ConsulConstants.CONSUL_EVENT_LTIME, event.getLTime()); + message.setHeader(ConsulConstants.CONSUL_NODE_FILTER, event.getNodeFilter()); + message.setHeader(ConsulConstants.CONSUL_SERVICE_FILTER, event.getServiceFilter()); + message.setHeader(ConsulConstants.CONSUL_TAG_FILTER, event.getTagFilter()); + message.setHeader(ConsulConstants.CONSUL_VERSION, event.getVersion()); + message.setBody(event.getPayload().orNull()); + + try { + getProcessor().process(exchange); + } catch (Exception e) { + getExceptionHandler().handleException("Error processing exchange", exchange, e); + } + } + + /** + * from spring-cloud-consul (https://github.com/spring-cloud/spring-cloud-consul): + * spring-cloud-consul-bus/src/main/java/org/springframework/cloud/consul/bus/EventService.java + */ + private List<Event> filterEvents(List<Event> toFilter, BigInteger lastIndex) { + List<Event> events = toFilter; + if (lastIndex != null) { + for (int i = 0; i < events.size(); i++) { + Event event = events.get(i); + BigInteger eventIndex = getEventIndexFromId(event); + if (eventIndex.equals(lastIndex)) { + events = events.subList(i + 1, events.size()); + break; + } + } + } + return events; + } + + private BigInteger getEventIndexFromId(Event event) { + String eventId = event.getId(); + String lower = eventId.substring(0, 8) + eventId.substring(9, 13) + eventId.substring(14, 18); + String upper = eventId.substring(19, 23) + eventId.substring(24, 36); + + BigInteger lowVal = new BigInteger(lower, 16); + BigInteger highVal = new BigInteger(upper, 16); + + return lowVal.xor(highVal); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventEndpoint.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventEndpoint.java new file mode 100644 index 0000000..df3254c --- /dev/null +++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventEndpoint.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.consul.enpoint; + +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.component.consul.AbstractConsulEndpoint; +import org.apache.camel.component.consul.ConsulComponent; +import org.apache.camel.component.consul.ConsulConfiguration; +import org.apache.camel.spi.UriEndpoint; + + +@UriEndpoint(scheme = "consul", title = "Consul KeyValue", syntax = "consul://event", consumerClass = ConsulEventConsumer.class, label = "api,cloud") +public class ConsulEventEndpoint extends AbstractConsulEndpoint { + public ConsulEventEndpoint(String uri, ConsulComponent component, ConsulConfiguration configuration) { + super("event", uri, component, configuration); + } + + @Override + public Producer createProducer() throws Exception { + return new ConsulEventProducer(this, getConfiguration()); + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + return new ConsulEventConsumer(this, getConfiguration(), processor); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventProducer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventProducer.java new file mode 100644 index 0000000..26b6595 --- /dev/null +++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventProducer.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.consul.enpoint; + +import com.orbitz.consul.EventClient; +import com.orbitz.consul.option.EventOptions; +import com.orbitz.consul.option.QueryOptions; +import org.apache.camel.InvokeOnHeader; +import org.apache.camel.Message; +import org.apache.camel.component.consul.AbstractConsulEndpoint; +import org.apache.camel.component.consul.AbstractConsulProducer; +import org.apache.camel.component.consul.ConsulConfiguration; + +public class ConsulEventProducer extends AbstractConsulProducer<EventClient> { + ConsulEventProducer(AbstractConsulEndpoint endpoint, ConsulConfiguration configuration) { + super(endpoint, configuration, c -> c.eventClient()); + } + + @InvokeOnHeader(ConsulEventActions.FIRE) + protected void fire(Message message) throws Exception { + setBodyAndResult( + message, + getClient().fireEvent( + getMandatoryKey(message), + getOption(message, EventOptions.BLANK, EventOptions.class), + message.getBody(String.class) + ) + ); + } + + @InvokeOnHeader(ConsulEventActions.LIST) + protected void list(Message message) throws Exception { + setBodyAndResult( + message, + getClient().listEvents( + getKey(message), + getOption(message, QueryOptions.BLANK, QueryOptions.class) + ) + ); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueActions.java ---------------------------------------------------------------------- diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueActions.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueActions.java new file mode 100644 index 0000000..d014bbd --- /dev/null +++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueActions.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.consul.enpoint; + +public interface ConsulKeyValueActions { + String PUT = "PUT"; + String GET_VALUE = "GET_VALUE"; + String GET_VALUES = "GET_VALUES"; + String GET_KEYS = "GET_KEYS"; + String GET_SESSIONS = "GET_SESSIONS"; + String DELETE_KEY = "DELETE_KEY"; + String DELETE_KEYS = "DELETE_KEYS"; + String LOCK = "LOCK"; + String UNLOCK = "UNLOCK"; +} http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueConsumer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueConsumer.java new file mode 100644 index 0000000..a90d8cb --- /dev/null +++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueConsumer.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.consul.enpoint; + +import java.util.List; + +import com.google.common.base.Optional; +import com.orbitz.consul.KeyValueClient; +import com.orbitz.consul.async.ConsulResponseCallback; +import com.orbitz.consul.model.ConsulResponse; +import com.orbitz.consul.model.kv.Value; +import com.orbitz.consul.option.QueryOptions; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.component.consul.AbstractConsulConsumer; +import org.apache.camel.component.consul.ConsulConfiguration; +import org.apache.camel.component.consul.ConsulConstants; + +public class ConsulKeyValueConsumer extends AbstractConsulConsumer<KeyValueClient> { + + protected ConsulKeyValueConsumer(ConsulKeyValueEndpoint endpoint, ConsulConfiguration configuration, Processor processor) { + super(endpoint, configuration, processor, c -> c.keyValueClient()); + } + + @Override + protected Runnable createWatcher(KeyValueClient client) throws Exception { + return configuration.isRecursive() ? new RecursivePathWatcher(client) : new PathWatcher(client); + } + + // ************************************************************************* + // Watch + // ************************************************************************* + + private abstract class AbstractPathWatcher<T> extends AbstractWatcher implements ConsulResponseCallback<T> { + protected AbstractPathWatcher(KeyValueClient client) { + super(client); + } + + protected QueryOptions queryOptions() { + return QueryOptions.blockSeconds(configuration.getBlockSeconds(), index.get()).build(); + } + + @Override + public void onComplete(ConsulResponse<T> consulResponse) { + if (isRunAllowed()) { + onResponse(consulResponse.getResponse()); + setIndex(consulResponse.getIndex()); + watch(); + } + } + + @Override + public void onFailure(Throwable throwable) { + onError(throwable); + } + + protected void onValue(Value value) { + final Exchange exchange = endpoint.createExchange(); + final Message message = exchange.getIn(); + + message.setHeader(ConsulConstants.CONSUL_KEY, value.getKey()); + message.setHeader(ConsulConstants.CONSUL_RESULT, true); + message.setHeader(ConsulConstants.CONSUL_FLAGS, value.getFlags()); + message.setHeader(ConsulConstants.CONSUL_CREATE_INDEX, value.getCreateIndex()); + message.setHeader(ConsulConstants.CONSUL_LOCK_INDEX, value.getLockIndex()); + message.setHeader(ConsulConstants.CONSUL_MODIFY_INDEX, value.getModifyIndex()); + message.setHeader(ConsulConstants.CONSUL_SESSION, value.getSession().orNull()); + message.setBody(configuration.isValueAsString() ? value.getValueAsString().orNull() : value.getValue().orNull()); + + try { + getProcessor().process(exchange); + } catch (Exception e) { + getExceptionHandler().handleException("Error processing exchange", exchange, e); + } + } + + protected abstract void onResponse(T consulResponse); + } + + private class PathWatcher extends AbstractPathWatcher<Optional<Value>> { + PathWatcher(KeyValueClient client) { + super(client); + } + + @Override + public void watch(KeyValueClient client) { + client.getValue(key, queryOptions(), this); + } + + @Override + public void onResponse(Optional<Value> value) { + if (value.isPresent()) { + onValue(value.get()); + } + } + } + + private class RecursivePathWatcher extends AbstractPathWatcher<List<Value>> { + RecursivePathWatcher(KeyValueClient client) { + super(client); + } + + @Override + public void watch(KeyValueClient client) { + client.getValues(key, queryOptions(), this); + } + + @Override + public void onResponse(List<Value> values) { + values.forEach(this::onValue); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueEndpoint.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueEndpoint.java new file mode 100644 index 0000000..2910910 --- /dev/null +++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueEndpoint.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.consul.enpoint; + +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.component.consul.AbstractConsulEndpoint; +import org.apache.camel.component.consul.ConsulComponent; +import org.apache.camel.component.consul.ConsulConfiguration; +import org.apache.camel.spi.UriEndpoint; + +@UriEndpoint(scheme = "consul", title = "Consul KeyValue", syntax = "consul://kv", consumerClass = ConsulKeyValueConsumer.class, label = "api,cloud") +public class ConsulKeyValueEndpoint extends AbstractConsulEndpoint { + public ConsulKeyValueEndpoint(String uri, ConsulComponent component, ConsulConfiguration configuration) { + super("kv", uri, component, configuration); + } + + @Override + public Producer createProducer() throws Exception { + return new ConsulKeyValueProducer(this, getConfiguration()); + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + return new ConsulKeyValueConsumer(this, getConfiguration(), processor); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueProducer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueProducer.java new file mode 100644 index 0000000..9596f4c --- /dev/null +++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueProducer.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.consul.enpoint; + +import com.orbitz.consul.KeyValueClient; +import com.orbitz.consul.option.PutOptions; +import com.orbitz.consul.option.QueryOptions; +import org.apache.camel.InvokeOnHeader; +import org.apache.camel.Message; +import org.apache.camel.component.consul.AbstractConsulEndpoint; +import org.apache.camel.component.consul.AbstractConsulProducer; +import org.apache.camel.component.consul.ConsulConfiguration; +import org.apache.camel.component.consul.ConsulConstants; + +public class ConsulKeyValueProducer extends AbstractConsulProducer<KeyValueClient> { + + ConsulKeyValueProducer(AbstractConsulEndpoint endpoint, ConsulConfiguration configuration) { + super(endpoint, configuration, c -> c.keyValueClient()); + } + + @InvokeOnHeader(ConsulKeyValueActions.PUT) + protected void put(Message message) throws Exception { + message.setHeader( + ConsulConstants.CONSUL_RESULT, + getClient().putValue( + getMandatoryKey(message), + message.getBody(String.class), + message.getHeader(ConsulConstants.CONSUL_FLAGS, 0L, Long.class), + getOption(message, PutOptions.BLANK, PutOptions.class) + ) + ); + } + + @InvokeOnHeader(ConsulKeyValueActions.GET_VALUE) + protected void getValue(Message message) throws Exception { + Object result; + + if (isValueAsString(message)) { + result = getClient().getValueAsString( + getMandatoryKey(message) + ).orNull(); + } else { + result = getClient().getValue( + getMandatoryKey(message), + getOption(message, QueryOptions.BLANK, QueryOptions.class) + ).orNull(); + } + + setBodyAndResult(message, result); + } + + @InvokeOnHeader(ConsulKeyValueActions.GET_VALUES) + protected void getValues(Message message) throws Exception { + Object result; + + if (isValueAsString(message)) { + result = getClient().getValuesAsString( + getMandatoryKey(message) + ); + } else { + result = getClient().getValues( + getMandatoryKey(message), + getOption(message, QueryOptions.BLANK, QueryOptions.class) + ); + } + + setBodyAndResult(message, result); + } + + @InvokeOnHeader(ConsulKeyValueActions.GET_KEYS) + protected void getKeys(Message message) throws Exception { + setBodyAndResult(message, getClient().getKeys(getMandatoryKey(message))); + } + + @InvokeOnHeader(ConsulKeyValueActions.GET_SESSIONS) + protected void getSessions(Message message) throws Exception { + setBodyAndResult(message, getClient().getSession(getMandatoryKey(message))); + } + + @InvokeOnHeader(ConsulKeyValueActions.DELETE_KEY) + protected void deleteKey(Message message) throws Exception { + getClient().deleteKey(getMandatoryKey(message)); + message.setHeader(ConsulConstants.CONSUL_RESULT, true); + } + + @InvokeOnHeader(ConsulKeyValueActions.DELETE_KEYS) + protected void deleteKeys(Message message) throws Exception { + getClient().deleteKeys(getMandatoryKey(message)); + message.setHeader(ConsulConstants.CONSUL_RESULT, true); + } + + @InvokeOnHeader(ConsulKeyValueActions.LOCK) + protected void lock(Message message) throws Exception { + message.setHeader(ConsulConstants.CONSUL_RESULT, + getClient().acquireLock( + getMandatoryKey(message), + getBody(message, null, String.class), + message.getHeader(ConsulConstants.CONSUL_SESSION, "", String.class) + ) + ); + } + + @InvokeOnHeader(ConsulKeyValueActions.UNLOCK) + protected void unlock(Message message) throws Exception { + message.setHeader(ConsulConstants.CONSUL_RESULT, + getClient().releaseLock( + getMandatoryKey(message), + getMandatoryHeader(message, ConsulConstants.CONSUL_SESSION, String.class) + ) + ); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/policy/ConsulRoutePolicy.java ---------------------------------------------------------------------- diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/policy/ConsulRoutePolicy.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/policy/ConsulRoutePolicy.java new file mode 100644 index 0000000..0b44e86 --- /dev/null +++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/policy/ConsulRoutePolicy.java @@ -0,0 +1,308 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.consul.policy; + +import java.math.BigInteger; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import com.google.common.base.Optional; +import com.orbitz.consul.Consul; +import com.orbitz.consul.KeyValueClient; +import com.orbitz.consul.SessionClient; +import com.orbitz.consul.async.ConsulResponseCallback; +import com.orbitz.consul.model.ConsulResponse; +import com.orbitz.consul.model.kv.Value; +import com.orbitz.consul.model.session.ImmutableSession; +import com.orbitz.consul.option.QueryOptions; +import org.apache.camel.Exchange; +import org.apache.camel.NonManagedService; +import org.apache.camel.Route; +import org.apache.camel.support.RoutePolicySupport; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConsulRoutePolicy extends RoutePolicySupport implements NonManagedService { + private static final Logger LOGGER = LoggerFactory.getLogger(ConsulRoutePolicy.class); + + private final Object lock; + private final Consul consul; + private final SessionClient sessionClient; + private final KeyValueClient keyValueClient; + private final AtomicBoolean leader; + private final Set<Route> suspendedRoutes; + private final AtomicReference<BigInteger> index; + + private String serviceName; + private String servicePath; + private int ttl; + private int lockDelay; + private ExecutorService executorService; + private boolean shouldStopConsumer; + + private String sessionId; + + public ConsulRoutePolicy() { + this(Consul.builder().build()); + } + + public ConsulRoutePolicy(Consul consul) { + this.consul = consul; + this.sessionClient = consul.sessionClient(); + this.keyValueClient = consul.keyValueClient(); + this.suspendedRoutes = new HashSet<>(); + this.leader = new AtomicBoolean(false); + this.lock = new Object(); + this.index = new AtomicReference<>(BigInteger.valueOf(0)); + this.serviceName = null; + this.servicePath = null; + this.ttl = 60; + this.lockDelay = 10; + this.executorService = null; + this.shouldStopConsumer = true; + this.sessionId = null; + } + + @Override + public void onExchangeBegin(Route route, Exchange exchange) { + if (leader.get()) { + if (shouldStopConsumer) { + startConsumer(route); + } + } else { + if (shouldStopConsumer) { + stopConsumer(route); + } + + exchange.setException(new IllegalStateException( + "Consul based route policy prohibits processing exchanges, stopping route and failing the exchange") + ); + } + } + + @Override + public void onStop(Route route) { + synchronized (lock) { + suspendedRoutes.remove(route); + } + } + + @Override + public synchronized void onSuspend(Route route) { + synchronized (lock) { + suspendedRoutes.remove(route); + } + } + + @Override + protected void doStart() throws Exception { + if (sessionId == null) { + sessionId = sessionClient.createSession( + ImmutableSession.builder() + .name(serviceName) + .ttl(ttl + "s") + .lockDelay(lockDelay + "s") + .build() + ).getId(); + + LOGGER.debug("SessionID = {}", sessionId); + if (executorService == null) { + executorService = Executors.newSingleThreadExecutor(); + } + + setLeader(keyValueClient.acquireLock(servicePath, sessionId)); + + executorService.submit(new Watcher()); + } + + super.doStart(); + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + + if (sessionId != null) { + sessionClient.destroySession(sessionId); + sessionId = null; + + if (executorService != null) { + executorService.shutdown(); + executorService.awaitTermination(ttl / 3, TimeUnit.SECONDS); + } + } + } + + // ************************************************************************* + // + // ************************************************************************* + + protected void setLeader(boolean isLeader) { + if (isLeader && leader.compareAndSet(false, isLeader)) { + LOGGER.debug("Leadership taken ({}, {})", serviceName, sessionId); + startAllStoppedConsumers(); + } else { + if (!leader.getAndSet(isLeader) && isLeader) { + LOGGER.debug("Leadership lost ({}, {})", serviceName, sessionId); + } + } + } + + private void startConsumer(Route route) { + synchronized (lock) { + try { + if (suspendedRoutes.contains(route)) { + startConsumer(route.getConsumer()); + suspendedRoutes.remove(route); + } + } catch (Exception e) { + handleException(e); + } + } + } + + private void stopConsumer(Route route) { + synchronized (lock) { + try { + if (!suspendedRoutes.contains(route)) { + LOGGER.debug("Stopping consumer for {} ({})", route.getId(), route.getConsumer()); + stopConsumer(route.getConsumer()); + suspendedRoutes.add(route); + } + } catch (Exception e) { + handleException(e); + } + } + } + + private void startAllStoppedConsumers() { + synchronized (lock) { + try { + for (Route route : suspendedRoutes) { + LOGGER.debug("Starting consumer for {} ({})", route.getId(), route.getConsumer()); + startConsumer(route.getConsumer()); + } + + suspendedRoutes.clear(); + } catch (Exception e) { + handleException(e); + } + } + } + + // ************************************************************************* + // Getter/Setters + // ************************************************************************* + + public Consul getConsul() { + return consul; + } + + public String getServiceName() { + return serviceName; + } + + public void setServiceName(String serviceName) { + this.serviceName = serviceName; + this.servicePath = String.format("/service/%s/leader", serviceName); + } + + public int getTtl() { + return ttl; + } + + public void setTtl(int ttl) { + this.ttl = ttl > 10 ? ttl : 10; + } + + public int getLockDelay() { + return lockDelay; + } + + public void setLockDelay(int lockDelay) { + this.lockDelay = lockDelay > 10 ? lockDelay : 10; + } + + public ExecutorService getExecutorService() { + return executorService; + } + + public void setExecutorService(ExecutorService executorService) { + this.executorService = executorService; + } + + public boolean isShouldStopConsumer() { + return shouldStopConsumer; + } + + public void setShouldStopConsumer(boolean shouldStopConsumer) { + this.shouldStopConsumer = shouldStopConsumer; + } + + // ************************************************************************* + // Watch + // ************************************************************************* + + private class Watcher implements Runnable, ConsulResponseCallback<Optional<Value>> { + + @Override + public void onComplete(ConsulResponse<Optional<Value>> consulResponse) { + if (isRunAllowed()) { + Value response = consulResponse.getResponse().orNull(); + if (response != null) { + String sid = response.getSession().orNull(); + if (ObjectHelper.isEmpty(sid)) { + // If the key is not held by any session, try acquire a + // lock (become leader) + LOGGER.debug("Try to take leadership ..."); + setLeader(keyValueClient.acquireLock(servicePath, sessionId)); + } else if (!sessionId.equals(sid) && leader.get()) { + // Looks like I've lost leadership + setLeader(false); + } + } + + index.set(consulResponse.getIndex()); + run(); + } + } + + @Override + public void onFailure(Throwable throwable) { + handleException(throwable); + } + + @Override + public void run() { + if (isRunAllowed()) { + // Refresh session + sessionClient.renewSession(sessionId); + + keyValueClient.getValue( + servicePath, + QueryOptions.blockSeconds(ttl / 3, index.get()).build(), + this); + } + } + } +}