CAMEL-10715: service-call : create ZooKeeper based ServiceDiscovery
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e6a7caec Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e6a7caec Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e6a7caec Branch: refs/heads/master Commit: e6a7caecc50621de4c9368d70e17f9ab5eba01e4 Parents: e59b00b Author: lburgazzoli <lburgazz...@gmail.com> Authored: Wed Jun 28 12:54:34 2017 +0200 Committer: lburgazzoli <lburgazz...@gmail.com> Committed: Wed Jun 28 15:03:33 2017 +0200 ---------------------------------------------------------------------- .../ServiceCallConfigurationDefinition.java | 20 +- .../model/cloud/ServiceCallDefinition.java | 20 +- ...erviceCallServiceDiscoveryConfiguration.java | 193 +++++++++++++++ components/camel-zookeeper/pom.xml | 11 +- .../ZooKeeperCuratorConfiguration.java | 247 +++++++++++++++++++ .../zookeeper/ZooKeeperCuratorHelper.java | 71 ++++++ .../cloud/ZooKeeperServiceDiscovery.java | 144 +++++++++++ .../cloud/ZooKeeperServiceDiscoveryFactory.java | 225 +++++++++++++++++ .../zookeeper/ha/ZooKeeperClusterService.java | 196 +++++++-------- .../zookeeper/ha/ZooKeeperClusterView.java | 7 +- .../camel/cloud/zookeeper-service-discovery | 17 ++ .../SpringZooKeeperServiceCallRouteTest.java | 128 ++++++++++ .../cloud/ZooKeeperServiceCallRouteTest.java | 150 +++++++++++ .../cloud/ZooKeeperServiceDiscoveryTest.java | 111 +++++++++ .../ha/ZooKeeperClusteredRoutePolicyTest.java | 4 +- .../SpringZooKeeperServiceCallRouteTest.xml | 65 +++++ parent/pom.xml | 2 +- .../features/src/main/resources/features.xml | 3 + 18 files changed, 1493 insertions(+), 121 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/e6a7caec/camel-core/src/main/java/org/apache/camel/model/cloud/ServiceCallConfigurationDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/cloud/ServiceCallConfigurationDefinition.java b/camel-core/src/main/java/org/apache/camel/model/cloud/ServiceCallConfigurationDefinition.java index d74b3a8..abd1eb4 100644 --- a/camel-core/src/main/java/org/apache/camel/model/cloud/ServiceCallConfigurationDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/cloud/ServiceCallConfigurationDefinition.java @@ -74,7 +74,8 @@ public class ServiceCallConfigurationDefinition extends IdentifiedType { @XmlElement(name = "dnsServiceDiscovery", type = DnsServiceCallServiceDiscoveryConfiguration.class), @XmlElement(name = "etcdServiceDiscovery", type = EtcdServiceCallServiceDiscoveryConfiguration.class), @XmlElement(name = "kubernetesServiceDiscovery", type = KubernetesServiceCallServiceDiscoveryConfiguration.class), - @XmlElement(name = "staticServiceDiscovery", type = StaticServiceCallServiceDiscoveryConfiguration.class)} + @XmlElement(name = "staticServiceDiscovery", type = StaticServiceCallServiceDiscoveryConfiguration.class), + @XmlElement(name = "zookeeperServiceDiscovery", type = ZooKeeperServiceCallServiceDiscoveryConfiguration.class)} ) private ServiceCallServiceDiscoveryConfiguration serviceDiscoveryConfiguration; @@ -562,6 +563,23 @@ public class ServiceCallConfigurationDefinition extends IdentifiedType { return this; } + public ZooKeeperServiceCallServiceDiscoveryConfiguration zookeeperServiceDiscovery() { + ZooKeeperServiceCallServiceDiscoveryConfiguration conf = new ZooKeeperServiceCallServiceDiscoveryConfiguration(); + setServiceDiscoveryConfiguration(conf); + + return conf; + } + + public ServiceCallConfigurationDefinition zookeeperServiceDiscovery(String nodes, String basePath) { + ZooKeeperServiceCallServiceDiscoveryConfiguration conf = new ZooKeeperServiceCallServiceDiscoveryConfiguration(); + conf.setNodes(nodes); + conf.setBasePath(basePath); + + setServiceDiscoveryConfiguration(conf); + + return this; + } + // ***************************** // Shortcuts - ServiceFilter // ***************************** http://git-wip-us.apache.org/repos/asf/camel/blob/e6a7caec/camel-core/src/main/java/org/apache/camel/model/cloud/ServiceCallDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/cloud/ServiceCallDefinition.java b/camel-core/src/main/java/org/apache/camel/model/cloud/ServiceCallDefinition.java index 283d701..74ee07d 100644 --- a/camel-core/src/main/java/org/apache/camel/model/cloud/ServiceCallDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/cloud/ServiceCallDefinition.java @@ -103,7 +103,8 @@ public class ServiceCallDefinition extends NoOutputDefinition<ServiceCallDefinit @XmlElement(name = "dnsServiceDiscovery", type = DnsServiceCallServiceDiscoveryConfiguration.class), @XmlElement(name = "etcdServiceDiscovery", type = EtcdServiceCallServiceDiscoveryConfiguration.class), @XmlElement(name = "kubernetesServiceDiscovery", type = KubernetesServiceCallServiceDiscoveryConfiguration.class), - @XmlElement(name = "staticServiceDiscovery", type = StaticServiceCallServiceDiscoveryConfiguration.class)} + @XmlElement(name = "staticServiceDiscovery", type = StaticServiceCallServiceDiscoveryConfiguration.class), + @XmlElement(name = "zookeeperServiceDiscovery", type = ZooKeeperServiceCallServiceDiscoveryConfiguration.class)} ) private ServiceCallServiceDiscoveryConfiguration serviceDiscoveryConfiguration; @@ -651,6 +652,23 @@ public class ServiceCallDefinition extends NoOutputDefinition<ServiceCallDefinit return conf; } + public ZooKeeperServiceCallServiceDiscoveryConfiguration zookeeperServiceDiscovery() { + ZooKeeperServiceCallServiceDiscoveryConfiguration conf = new ZooKeeperServiceCallServiceDiscoveryConfiguration(this); + setServiceDiscoveryConfiguration(conf); + + return conf; + } + + public ServiceCallDefinition zookeeperServiceDiscovery(String nodes, String basePath) { + ZooKeeperServiceCallServiceDiscoveryConfiguration conf = new ZooKeeperServiceCallServiceDiscoveryConfiguration(this); + conf.setNodes(nodes); + conf.setBasePath(basePath); + + setServiceDiscoveryConfiguration(conf); + + return this; + } + // ***************************** // Shortcuts - ServiceFilter // ***************************** http://git-wip-us.apache.org/repos/asf/camel/blob/e6a7caec/camel-core/src/main/java/org/apache/camel/model/cloud/ZooKeeperServiceCallServiceDiscoveryConfiguration.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/cloud/ZooKeeperServiceCallServiceDiscoveryConfiguration.java b/camel-core/src/main/java/org/apache/camel/model/cloud/ZooKeeperServiceCallServiceDiscoveryConfiguration.java new file mode 100644 index 0000000..3bc9ecb --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/model/cloud/ZooKeeperServiceCallServiceDiscoveryConfiguration.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.model.cloud; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlAttribute; +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.camel.spi.Metadata; + +@Metadata(label = "routing,cloud,service-discovery") +@XmlRootElement(name = "zookeeperServiceDiscovery") +@XmlAccessorType(XmlAccessType.FIELD) +public class ZooKeeperServiceCallServiceDiscoveryConfiguration extends ServiceCallServiceDiscoveryConfiguration { + @XmlAttribute(required = true) + private String nodes; + @XmlAttribute + private String namespace; + @XmlAttribute + private String reconnectBaseSleepTime; + @XmlAttribute + private String reconnectMaxSleepTime; + @XmlAttribute + private Integer reconnectMaxRetries; + @XmlAttribute + private String sessionTimeout; + @XmlAttribute + private String connectionTimeout; + @XmlAttribute(required = true) + private String basePath; + + + public ZooKeeperServiceCallServiceDiscoveryConfiguration() { + this(null); + } + + public ZooKeeperServiceCallServiceDiscoveryConfiguration(ServiceCallDefinition parent) { + super(parent, "zookeeper-service-discovery"); + } + + // ************************************************************************* + // Getter/Setter + // ************************************************************************* + + public String getNodes() { + return nodes; + } + + /** + * A comma separate list of servers to connect to in the form host:port + */ + public void setNodes(String nodes) { + this.nodes = nodes; + } + + public String getNamespace() { + return namespace; + } + + /** + * As ZooKeeper is a shared space, users of a given cluster should stay within + * a pre-defined namespace. If a namespace is set here, all paths will get pre-pended + * with the namespace + */ + public void setNamespace(String namespace) { + this.namespace = namespace; + } + + public String getReconnectBaseSleepTime() { + return reconnectBaseSleepTime; + } + + /** + * Initial amount of time to wait between retries. + */ + public void setReconnectBaseSleepTime(String reconnectBaseSleepTime) { + this.reconnectBaseSleepTime = reconnectBaseSleepTime; + } + + public String getReconnectMaxSleepTime() { + return reconnectMaxSleepTime; + } + + /** + * Max time in ms to sleep on each retry + */ + public void setReconnectMaxSleepTime(String reconnectMaxSleepTime) { + this.reconnectMaxSleepTime = reconnectMaxSleepTime; + } + + public Integer getReconnectMaxRetries() { + return reconnectMaxRetries; + } + + /** + * Max number of times to retry + */ + public void setReconnectMaxRetries(Integer reconnectMaxRetries) { + this.reconnectMaxRetries = reconnectMaxRetries; + } + + public String getSessionTimeout() { + return sessionTimeout; + } + + /** + * Session timeout. + */ + public void setSessionTimeout(String sessionTimeout) { + this.sessionTimeout = sessionTimeout; + } + + public String getConnectionTimeout() { + return connectionTimeout; + } + + /** + * Connection timeout. + */ + public void setConnectionTimeout(String connectionTimeout) { + this.connectionTimeout = connectionTimeout; + } + + public String getBasePath() { + return basePath; + } + + /** + * Set the base path to store in ZK + */ + public void setBasePath(String basePath) { + this.basePath = basePath; + } + + // ************************************************************************* + // Fluent API + // ************************************************************************* + + public ZooKeeperServiceCallServiceDiscoveryConfiguration nodes(String nodes) { + setNodes(nodes); + return this; + } + + public ZooKeeperServiceCallServiceDiscoveryConfiguration namespace(String namespace) { + setNamespace(namespace); + return this; + } + + public ZooKeeperServiceCallServiceDiscoveryConfiguration reconnectBaseSleepTime(String reconnectBaseSleepTime) { + setReconnectBaseSleepTime(reconnectBaseSleepTime); + return this; + } + + public ZooKeeperServiceCallServiceDiscoveryConfiguration reconnectMaxSleepTime(String reconnectMaxSleepTime) { + setReconnectMaxSleepTime(reconnectMaxSleepTime); + return this; + } + + public ZooKeeperServiceCallServiceDiscoveryConfiguration reconnectMaxRetries(int reconnectMaxRetries) { + setReconnectMaxRetries(reconnectMaxRetries); + return this; + } + + public ZooKeeperServiceCallServiceDiscoveryConfiguration sessionTimeout(String sessionTimeout) { + setSessionTimeout(sessionTimeout); + return this; + } + + public ZooKeeperServiceCallServiceDiscoveryConfiguration connectionTimeout(String connectionTimeout) { + setConnectionTimeout(connectionTimeout); + return this; + } + + public ZooKeeperServiceCallServiceDiscoveryConfiguration basePath(String basePath) { + setBasePath(basePath); + return this; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/e6a7caec/components/camel-zookeeper/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/pom.xml b/components/camel-zookeeper/pom.xml index a3278b3..afb1b16 100644 --- a/components/camel-zookeeper/pom.xml +++ b/components/camel-zookeeper/pom.xml @@ -81,11 +81,20 @@ <version>${curator-version}</version> </dependency> - + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-x-discovery</artifactId> + <version>${curator-version}</version> + </dependency> <!-- test dependencies --> <dependency> <groupId>org.apache.camel</groupId> + <artifactId>camel-jetty</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> <artifactId>camel-test-spring</artifactId> <scope>test</scope> </dependency> http://git-wip-us.apache.org/repos/asf/camel/blob/e6a7caec/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperCuratorConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperCuratorConfiguration.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperCuratorConfiguration.java new file mode 100644 index 0000000..7b773d0 --- /dev/null +++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperCuratorConfiguration.java @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.zookeeper; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.apache.camel.RuntimeCamelException; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.AuthInfo; +import org.apache.curator.framework.CuratorFramework; + +public class ZooKeeperCuratorConfiguration implements Cloneable { + private CuratorFramework curatorFramework; + private List<String> nodes; + private String namespace; + private long reconnectBaseSleepTime; + private TimeUnit reconnectBaseSleepTimeUnit; + private int reconnectMaxRetries; + private long reconnectMaxSleepTime; + private TimeUnit reconnectMaxSleepTimeUnit; + private long sessionTimeout; + private TimeUnit sessionTimeoutUnit; + private long connectionTimeout; + private TimeUnit connectionTimeoutUnit; + private List<AuthInfo> authInfoList; + private long maxCloseWait; + private TimeUnit maxCloseWaitUnit; + private RetryPolicy retryPolicy; + private String basePath; + + public ZooKeeperCuratorConfiguration() { + this.reconnectBaseSleepTime = 1000; + this.reconnectBaseSleepTimeUnit = TimeUnit.MILLISECONDS; + this.reconnectMaxSleepTime = Integer.MAX_VALUE; + this.reconnectMaxSleepTimeUnit = TimeUnit.MILLISECONDS; + this.reconnectMaxRetries = 3; + + // from org.apache.curator.framework.CuratorFrameworkFactory + this.sessionTimeout = Integer.getInteger("curator-default-session-timeout", 60 * 1000); + this.sessionTimeoutUnit = TimeUnit.MILLISECONDS; + + // from org.apache.curator.framework.CuratorFrameworkFactory + this.connectionTimeout = Integer.getInteger("curator-default-connection-timeout", 15 * 1000); + this.connectionTimeoutUnit = TimeUnit.MILLISECONDS; + + // from org.apache.curator.framework.CuratorFrameworkFactory + this.maxCloseWait = 1000; + this.maxCloseWaitUnit = TimeUnit.MILLISECONDS; + } + + // ******************************* + // Properties + // ******************************* + + public CuratorFramework getCuratorFramework() { + return curatorFramework; + } + + public void setCuratorFramework(CuratorFramework curatorFramework) { + this.curatorFramework = curatorFramework; + } + + public List<String> getNodes() { + return nodes; + } + + public void setNodes(String nodes) { + this.nodes = Collections.unmodifiableList( + Arrays.stream(nodes.split(",")).collect(Collectors.toList()) + ); + } + + public void setNodes(List<String> nodes) { + this.nodes = Collections.unmodifiableList(new ArrayList<>(nodes)); + } + + public String getNamespace() { + return namespace; + } + + public void setNamespace(String namespace) { + this.namespace = namespace; + } + + public long getReconnectBaseSleepTime() { + return reconnectBaseSleepTime; + } + + public void setReconnectBaseSleepTime(long reconnectBaseSleepTime) { + this.reconnectBaseSleepTime = reconnectBaseSleepTime; + } + + public void setReconnectBaseSleepTime(long reconnectBaseSleepTime, TimeUnit reconnectBaseSleepTimeUnit) { + this.reconnectBaseSleepTime = reconnectBaseSleepTime; + this.reconnectBaseSleepTimeUnit = reconnectBaseSleepTimeUnit; + } + + public TimeUnit getReconnectBaseSleepTimeUnit() { + return reconnectBaseSleepTimeUnit; + } + + public void setReconnectBaseSleepTimeUnit(TimeUnit reconnectBaseSleepTimeUnit) { + this.reconnectBaseSleepTimeUnit = reconnectBaseSleepTimeUnit; + } + + public long getReconnectMaxSleepTime() { + return reconnectMaxSleepTime; + } + + public void setReconnectMaxSleepTime(long reconnectMaxSleepTime) { + this.reconnectMaxSleepTime = reconnectMaxSleepTime; + } + + public void setReconnectMaxSleepTime(long reconnectMaxSleepTime, TimeUnit reconnectBaseSleepTimeUnit) { + this.reconnectMaxSleepTime = reconnectMaxSleepTime; + this.reconnectBaseSleepTimeUnit = reconnectBaseSleepTimeUnit; + } + + public TimeUnit getReconnectMaxSleepTimeUnit() { + return reconnectMaxSleepTimeUnit; + } + + public void setReconnectMaxSleepTimeUnit(TimeUnit reconnectMaxSleepTimeUnit) { + this.reconnectMaxSleepTimeUnit = reconnectMaxSleepTimeUnit; + } + + public int getReconnectMaxRetries() { + return reconnectMaxRetries; + } + + public void setReconnectMaxRetries(int reconnectMaxRetries) { + this.reconnectMaxRetries = reconnectMaxRetries; + } + + public long getSessionTimeout() { + return sessionTimeout; + } + + public void setSessionTimeout(long sessionTimeout) { + this.sessionTimeout = sessionTimeout; + } + + public void setSessionTimeout(long sessionTimeout, TimeUnit sessionTimeoutUnit) { + this.sessionTimeout = sessionTimeout; + this.sessionTimeoutUnit = sessionTimeoutUnit; + } + + public TimeUnit getSessionTimeoutUnit() { + return sessionTimeoutUnit; + } + + public void setSessionTimeoutUnit(TimeUnit sessionTimeoutUnit) { + this.sessionTimeoutUnit = sessionTimeoutUnit; + } + + public long getConnectionTimeout() { + return connectionTimeout; + } + + public void setConnectionTimeout(long connectionTimeout) { + this.connectionTimeout = connectionTimeout; + } + + public void setConnectionTimeout(long connectionTimeout, TimeUnit connectionTimeotUnit) { + this.connectionTimeout = connectionTimeout; + this.connectionTimeoutUnit = connectionTimeotUnit; + } + + public TimeUnit getConnectionTimeoutUnit() { + return connectionTimeoutUnit; + } + + public void setConnectionTimeoutUnit(TimeUnit connectionTimeoutUnit) { + this.connectionTimeoutUnit = connectionTimeoutUnit; + } + + public List<AuthInfo> getAuthInfoList() { + return authInfoList; + } + + public void setAuthInfoList(List<AuthInfo> authInfoList) { + this.authInfoList = authInfoList; + } + + public long getMaxCloseWait() { + return maxCloseWait; + } + + public void setMaxCloseWait(long maxCloseWait) { + this.maxCloseWait = maxCloseWait; + } + + public TimeUnit getMaxCloseWaitUnit() { + return maxCloseWaitUnit; + } + + public void setMaxCloseWaitUnit(TimeUnit maxCloseWaitUnit) { + this.maxCloseWaitUnit = maxCloseWaitUnit; + } + + public RetryPolicy getRetryPolicy() { + return retryPolicy; + } + + public void setRetryPolicy(RetryPolicy retryPolicy) { + this.retryPolicy = retryPolicy; + } + + public String getBasePath() { + return basePath; + } + + public void setBasePath(String basePath) { + this.basePath = basePath; + } + + // ******************************* + // Clone + // ******************************* + + public ZooKeeperCuratorConfiguration copy() { + try { + return (ZooKeeperCuratorConfiguration)clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeCamelException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/e6a7caec/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperCuratorHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperCuratorHelper.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperCuratorHelper.java new file mode 100644 index 0000000..2ee0385 --- /dev/null +++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperCuratorHelper.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.zookeeper; + +import java.util.Optional; + +import org.apache.camel.util.ObjectHelper; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.x.discovery.ServiceDiscovery; +import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; +import org.apache.curator.x.discovery.details.JsonInstanceSerializer; + +public final class ZooKeeperCuratorHelper { + private ZooKeeperCuratorHelper() { + } + + public static CuratorFramework createCurator(ZooKeeperCuratorConfiguration configuration) throws Exception { + CuratorFramework curator = configuration.getCuratorFramework(); + if (curator == null) { + // Validate parameters + ObjectHelper.notNull(configuration.getNodes(), "ZooKeeper Nodes"); + + RetryPolicy retryPolicy = configuration.getRetryPolicy(); + if (retryPolicy == null) { + retryPolicy = new ExponentialBackoffRetry( + (int)configuration.getReconnectBaseSleepTimeUnit().toMillis(configuration.getReconnectBaseSleepTime()), + (int)configuration.getReconnectMaxSleepTimeUnit().toMillis(configuration.getReconnectMaxSleepTime()), + configuration.getReconnectMaxRetries()); + } + + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() + .connectString(String.join(",", configuration.getNodes())) + .sessionTimeoutMs((int) configuration.getSessionTimeoutUnit().toMillis(configuration.getSessionTimeout())) + .connectionTimeoutMs((int) configuration.getConnectionTimeoutUnit().toMillis(configuration.getConnectionTimeout())) + .maxCloseWaitMs((int) configuration.getMaxCloseWaitUnit().toMillis(configuration.getMaxCloseWait())) + .retryPolicy(retryPolicy); + + Optional.ofNullable(configuration.getNamespace()).ifPresent(builder::namespace); + Optional.ofNullable(configuration.getAuthInfoList()).ifPresent(builder::authorization); + + curator = builder.build(); + } + + return curator; + } + + public static <T> ServiceDiscovery<T> createServiceDiscovery(ZooKeeperCuratorConfiguration configuration, CuratorFramework curator, Class<T> payloadType) { + return ServiceDiscoveryBuilder.builder(payloadType) + .client(curator) + .basePath(configuration.getBasePath()) + .serializer(new JsonInstanceSerializer<>(payloadType)) + .build(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/e6a7caec/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/cloud/ZooKeeperServiceDiscovery.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/cloud/ZooKeeperServiceDiscovery.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/cloud/ZooKeeperServiceDiscovery.java new file mode 100644 index 0000000..3a7dde9 --- /dev/null +++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/cloud/ZooKeeperServiceDiscovery.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.zookeeper.cloud; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.cloud.ServiceDefinition; +import org.apache.camel.component.zookeeper.ZooKeeperCuratorConfiguration; +import org.apache.camel.component.zookeeper.ZooKeeperCuratorHelper; +import org.apache.camel.impl.cloud.DefaultServiceDefinition; +import org.apache.camel.impl.cloud.DefaultServiceDiscovery; +import org.apache.camel.util.ObjectHelper; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.x.discovery.ServiceDiscovery; +import org.codehaus.jackson.map.annotate.JsonRootName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ZooKeeperServiceDiscovery extends DefaultServiceDiscovery { + private static final Logger LOGGER = LoggerFactory.getLogger(ZooKeeperServiceDiscovery.class); + + private final ZooKeeperCuratorConfiguration configuration; + private final boolean managedInstance; + private CuratorFramework curator; + private ServiceDiscovery<MetaData> serviceDiscovery; + + public ZooKeeperServiceDiscovery(ZooKeeperCuratorConfiguration configuration) { + this.configuration = configuration; + this.curator = configuration.getCuratorFramework(); + this.managedInstance = Objects.isNull(curator); + } + + // ********************************************* + // Lifecycle + // ********************************************* + + @Override + protected void doStart() throws Exception { + if (curator == null) { + // Validation + ObjectHelper.notNull(getCamelContext(), "Camel Context"); + ObjectHelper.notNull(configuration.getBasePath(), "ZooKeeper base path"); + + LOGGER.debug("Starting ZooKeeper Curator with namespace '{}', nodes: '{}'", + configuration.getNamespace(), + String.join(",", configuration.getNodes()) + ); + + curator = ZooKeeperCuratorHelper.createCurator(configuration); + curator.start(); + } + + if (serviceDiscovery == null) { + // Validation + ObjectHelper.notNull(configuration.getBasePath(), "ZooKeeper base path"); + + LOGGER.debug("Starting ZooKeeper ServiceDiscoveryBuilder with base path '{}'", + configuration.getBasePath() + ); + + serviceDiscovery = ZooKeeperCuratorHelper.createServiceDiscovery(configuration, curator, MetaData.class); + serviceDiscovery.start(); + } + + super.doStart(); + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + + if (serviceDiscovery != null) { + try { + serviceDiscovery.close(); + } catch (Exception e) { + LOGGER.warn("Error closing Curator ServiceDiscovery", e); + } + } + + if (curator != null && managedInstance) { + curator.close(); + } + } + + // ********************************************* + // Implementation + // ********************************************* + + @Override + public List<ServiceDefinition> getServices(String name) { + if (serviceDiscovery == null) { + return Collections.emptyList(); + } + + try { + return serviceDiscovery.queryForInstances(name).stream() + .map(si -> { + Map<String, String> meta = new HashMap<>(); + ObjectHelper.ifNotEmpty(si.getPayload(), meta::putAll); + + meta.put("service_name", si.getName()); + meta.put("service_id", si.getId()); + meta.put("service_type", si.getServiceType().name()); + + return new DefaultServiceDefinition( + si.getName(), + si.getAddress(), + si.getSslPort() != null ? si.getSslPort() : si.getPort(), + meta); + }) + .collect(Collectors.toList()); + } catch (Exception e) { + throw new RuntimeCamelException(e); + } + } + + // ********************************************* + // Helpers + // ********************************************* + + @JsonRootName("meta") + public static final class MetaData extends HashMap<String, String> { + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/e6a7caec/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/cloud/ZooKeeperServiceDiscoveryFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/cloud/ZooKeeperServiceDiscoveryFactory.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/cloud/ZooKeeperServiceDiscoveryFactory.java new file mode 100644 index 0000000..b3b0604 --- /dev/null +++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/cloud/ZooKeeperServiceDiscoveryFactory.java @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.zookeeper.cloud; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContext; +import org.apache.camel.cloud.ServiceDiscovery; +import org.apache.camel.cloud.ServiceDiscoveryFactory; +import org.apache.camel.component.zookeeper.ZooKeeperCuratorConfiguration; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.AuthInfo; +import org.apache.curator.framework.CuratorFramework; + +public class ZooKeeperServiceDiscoveryFactory implements ServiceDiscoveryFactory { + + private ZooKeeperCuratorConfiguration configuration; + + public ZooKeeperServiceDiscoveryFactory() { + this.configuration = new ZooKeeperCuratorConfiguration(); + } + + public ZooKeeperServiceDiscoveryFactory(ZooKeeperCuratorConfiguration configuration) { + this.configuration = configuration.copy(); + } + + // ********************************************* + // Properties + // ********************************************* + + public ZooKeeperCuratorConfiguration getConfiguration() { + return configuration; + } + + public void setConfiguration(ZooKeeperCuratorConfiguration configuration) { + this.configuration = configuration.copy(); + } + + public CuratorFramework getCuratorFramework() { + return configuration.getCuratorFramework(); + } + + public void setCuratorFramework(CuratorFramework curatorFramework) { + configuration.setCuratorFramework(curatorFramework); + } + + public List<String> getNodes() { + return configuration.getNodes(); + } + + public void setNodes(String nodes) { + configuration.setNodes(nodes); + } + + public void setNodes(List<String> nodes) { + configuration.setNodes(nodes); + } + + public String getNamespace() { + return configuration.getNamespace(); + } + + public void setNamespace(String namespace) { + configuration.setNamespace(namespace); + } + + public long getReconnectBaseSleepTime() { + return configuration.getReconnectBaseSleepTime(); + } + + public void setReconnectBaseSleepTime(long reconnectBaseSleepTime) { + configuration.setReconnectBaseSleepTime(reconnectBaseSleepTime); + } + + public void setReconnectBaseSleepTime(long reconnectBaseSleepTime, TimeUnit reconnectBaseSleepTimeUnit) { + configuration.setReconnectBaseSleepTime(reconnectBaseSleepTime, reconnectBaseSleepTimeUnit); + } + + public TimeUnit getReconnectBaseSleepTimeUnit() { + return configuration.getReconnectBaseSleepTimeUnit(); + } + + public void setReconnectBaseSleepTimeUnit(TimeUnit reconnectBaseSleepTimeUnit) { + configuration.setReconnectBaseSleepTimeUnit(reconnectBaseSleepTimeUnit); + } + + public long getReconnectMaxSleepTime() { + return configuration.getReconnectMaxSleepTime(); + } + + public void setReconnectMaxSleepTime(long reconnectMaxSleepTime) { + configuration.setReconnectMaxSleepTime(reconnectMaxSleepTime); + } + + public void setReconnectMaxSleepTime(long reconnectMaxSleepTime, TimeUnit reconnectBaseSleepTimeUnit) { + configuration.setReconnectMaxSleepTime(reconnectMaxSleepTime, reconnectBaseSleepTimeUnit); + } + + public TimeUnit getReconnectMaxSleepTimeUnit() { + return configuration.getReconnectMaxSleepTimeUnit(); + } + + public void setReconnectMaxSleepTimeUnit(TimeUnit reconnectMaxSleepTimeUnit) { + configuration.setReconnectMaxSleepTimeUnit(reconnectMaxSleepTimeUnit); + } + + public int getReconnectMaxRetries() { + return configuration.getReconnectMaxRetries(); + } + + public void setReconnectMaxRetries(int reconnectMaxRetries) { + configuration.setReconnectMaxRetries(reconnectMaxRetries); + } + + public long getSessionTimeout() { + return configuration.getSessionTimeout(); + } + + public void setSessionTimeout(long sessionTimeout) { + configuration.setSessionTimeout(sessionTimeout); + } + + public void setSessionTimeout(long sessionTimeout, TimeUnit sessionTimeoutUnit) { + configuration.setSessionTimeout(sessionTimeout, sessionTimeoutUnit); + } + + public TimeUnit getSessionTimeoutUnit() { + return configuration.getSessionTimeoutUnit(); + } + + public void setSessionTimeoutUnit(TimeUnit sessionTimeoutUnit) { + configuration.setSessionTimeoutUnit(sessionTimeoutUnit); + } + + public long getConnectionTimeout() { + return configuration.getConnectionTimeout(); + } + + public void setConnectionTimeout(long connectionTimeout) { + configuration.setConnectionTimeout(connectionTimeout); + } + + public void setConnectionTimeout(long connectionTimeout, TimeUnit connectionTimeotUnit) { + configuration.setConnectionTimeout(connectionTimeout, connectionTimeotUnit); + } + + public TimeUnit getConnectionTimeoutUnit() { + return configuration.getConnectionTimeoutUnit(); + } + + public void setConnectionTimeoutUnit(TimeUnit connectionTimeoutUnit) { + configuration.setConnectionTimeoutUnit(connectionTimeoutUnit); + } + + public ZooKeeperCuratorConfiguration copy() { + return configuration.copy(); + } + + public List<AuthInfo> getAuthInfoList() { + return configuration.getAuthInfoList(); + } + + public void setAuthInfoList(List<AuthInfo> authInfoList) { + configuration.setAuthInfoList(authInfoList); + } + + public long getMaxCloseWait() { + return configuration.getMaxCloseWait(); + } + + public void setMaxCloseWait(long maxCloseWait) { + configuration.setMaxCloseWait(maxCloseWait); + } + + public TimeUnit getMaxCloseWaitUnit() { + return configuration.getMaxCloseWaitUnit(); + } + + public void setMaxCloseWaitUnit(TimeUnit maxCloseWaitUnit) { + configuration.setMaxCloseWaitUnit(maxCloseWaitUnit); + } + + public RetryPolicy getRetryPolicy() { + return configuration.getRetryPolicy(); + } + + public void setRetryPolicy(RetryPolicy retryPolicy) { + configuration.setRetryPolicy(retryPolicy); + } + + public String getBasePath() { + return configuration.getBasePath(); + } + + public void setBasePath(String basePath) { + configuration.setBasePath(basePath); + } + + // ********************************************* + // Factory + // ********************************************* + + @Override + public ServiceDiscovery newInstance(CamelContext context) throws Exception { + ZooKeeperServiceDiscovery discovery = new ZooKeeperServiceDiscovery(configuration); + discovery.setCamelContext(context); + + return discovery; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/e6a7caec/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterService.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterService.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterService.java index 389d83c..6af1269 100644 --- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterService.java +++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterService.java @@ -16,205 +16,182 @@ */ package org.apache.camel.component.zookeeper.ha; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; import java.util.List; -import java.util.Optional; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; +import org.apache.camel.component.zookeeper.ZooKeeperCuratorConfiguration; +import org.apache.camel.component.zookeeper.ZooKeeperCuratorHelper; import org.apache.camel.impl.ha.AbstractCamelClusterService; import org.apache.camel.util.ObjectHelper; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.AuthInfo; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.retry.ExponentialBackoffRetry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ZooKeeperClusterService extends AbstractCamelClusterService<ZooKeeperClusterView> { private static final Logger LOGGER = LoggerFactory.getLogger(ZooKeeperClusterService.class); - private CuratorFramework client; - private List<String> nodes; - private String namespace; - private long reconnectBaseSleepTime; - private TimeUnit reconnectBaseSleepTimeUnit; - private int reconnectMaxRetries; - private long sessionTimeout; - private TimeUnit sessionTimeoutUnit; - private long connectionTimeout; - private TimeUnit connectionTimeotUnit; - private List<AuthInfo> authInfoList; - private long maxCloseWait; - private TimeUnit maxCloseWaitUnit; - private boolean closeOnStop; - private RetryPolicy retryPolicy; + private CuratorFramework curator; + private ZooKeeperCuratorConfiguration configuration; + private boolean managedInstance; public ZooKeeperClusterService() { - this.reconnectBaseSleepTime = 1000; - this.reconnectBaseSleepTimeUnit = TimeUnit.MILLISECONDS; - this.reconnectMaxRetries = 3; - this.closeOnStop = true; - - // from org.apache.curator.framework.CuratorFrameworkFactory - this.sessionTimeout = Integer.getInteger("curator-default-session-timeout", 60 * 1000); - this.sessionTimeoutUnit = TimeUnit.MILLISECONDS; - - // from org.apache.curator.framework.CuratorFrameworkFactory - this.connectionTimeout = Integer.getInteger("curator-default-connection-timeout", 15 * 1000); - this.connectionTimeotUnit = TimeUnit.MILLISECONDS; + this.configuration = new ZooKeeperCuratorConfiguration(); + this.managedInstance = true; + } - // from org.apache.curator.framework.CuratorFrameworkFactory - this.maxCloseWait = 1000; - this.maxCloseWaitUnit = TimeUnit.MILLISECONDS; + public ZooKeeperClusterService(ZooKeeperCuratorConfiguration configuration) { + this.configuration = configuration.copy(); + this.managedInstance = true; } // ********************************************* // Properties // ********************************************* - public CuratorFramework getClient() { - return client; + public ZooKeeperCuratorConfiguration getConfiguration() { + return configuration; + } + + public void setConfiguration(ZooKeeperCuratorConfiguration configuration) { + this.configuration = configuration.copy(); + } + + public CuratorFramework getCuratorFramework() { + return configuration.getCuratorFramework(); } - public void setClient(CuratorFramework client) { - this.client = client; + public void setCuratorFramework(CuratorFramework curatorFramework) { + configuration.setCuratorFramework(curatorFramework); } public List<String> getNodes() { - return nodes; + return configuration.getNodes(); } public void setNodes(String nodes) { - this.nodes = Collections.unmodifiableList( - Arrays.stream(nodes.split(",")).collect(Collectors.toList()) - ); + configuration.setNodes(nodes); } public void setNodes(List<String> nodes) { - this.nodes = Collections.unmodifiableList(new ArrayList<>(nodes)); + configuration.setNodes(nodes); } public String getNamespace() { - return namespace; + return configuration.getNamespace(); } public void setNamespace(String namespace) { - this.namespace = namespace; + configuration.setNamespace(namespace); } public long getReconnectBaseSleepTime() { - return reconnectBaseSleepTime; + return configuration.getReconnectBaseSleepTime(); } public void setReconnectBaseSleepTime(long reconnectBaseSleepTime) { - this.reconnectBaseSleepTime = reconnectBaseSleepTime; + configuration.setReconnectBaseSleepTime(reconnectBaseSleepTime); } public void setReconnectBaseSleepTime(long reconnectBaseSleepTime, TimeUnit reconnectBaseSleepTimeUnit) { - this.reconnectBaseSleepTime = reconnectBaseSleepTime; - this.reconnectBaseSleepTimeUnit = reconnectBaseSleepTimeUnit; + configuration.setReconnectBaseSleepTime(reconnectBaseSleepTime, reconnectBaseSleepTimeUnit); } public TimeUnit getReconnectBaseSleepTimeUnit() { - return reconnectBaseSleepTimeUnit; + return configuration.getReconnectBaseSleepTimeUnit(); } public void setReconnectBaseSleepTimeUnit(TimeUnit reconnectBaseSleepTimeUnit) { - this.reconnectBaseSleepTimeUnit = reconnectBaseSleepTimeUnit; + configuration.setReconnectBaseSleepTimeUnit(reconnectBaseSleepTimeUnit); } public int getReconnectMaxRetries() { - return reconnectMaxRetries; + return configuration.getReconnectMaxRetries(); } public void setReconnectMaxRetries(int reconnectMaxRetries) { - this.reconnectMaxRetries = reconnectMaxRetries; + configuration.setReconnectMaxRetries(reconnectMaxRetries); } public long getSessionTimeout() { - return sessionTimeout; + return configuration.getSessionTimeout(); } public void setSessionTimeout(long sessionTimeout) { - this.sessionTimeout = sessionTimeout; + configuration.setSessionTimeout(sessionTimeout); } public void setSessionTimeout(long sessionTimeout, TimeUnit sessionTimeoutUnit) { - this.sessionTimeout = sessionTimeout; - this.sessionTimeoutUnit = sessionTimeoutUnit; + configuration.setSessionTimeout(sessionTimeout, sessionTimeoutUnit); } public TimeUnit getSessionTimeoutUnit() { - return sessionTimeoutUnit; + return configuration.getSessionTimeoutUnit(); } public void setSessionTimeoutUnit(TimeUnit sessionTimeoutUnit) { - this.sessionTimeoutUnit = sessionTimeoutUnit; + configuration.setSessionTimeoutUnit(sessionTimeoutUnit); } public long getConnectionTimeout() { - return connectionTimeout; + return configuration.getConnectionTimeout(); } public void setConnectionTimeout(long connectionTimeout) { - this.connectionTimeout = connectionTimeout; + configuration.setConnectionTimeout(connectionTimeout); } public void setConnectionTimeout(long connectionTimeout, TimeUnit connectionTimeotUnit) { - this.connectionTimeout = connectionTimeout; - this.connectionTimeotUnit = connectionTimeotUnit; + configuration.setConnectionTimeout(connectionTimeout, connectionTimeotUnit); } public TimeUnit getConnectionTimeotUnit() { - return connectionTimeotUnit; + return configuration.getConnectionTimeoutUnit(); } public void setConnectionTimeotUnit(TimeUnit connectionTimeotUnit) { - this.connectionTimeotUnit = connectionTimeotUnit; + configuration.setConnectionTimeoutUnit(connectionTimeotUnit); } public List<AuthInfo> getAuthInfoList() { - return authInfoList; + return configuration.getAuthInfoList(); } public void setAuthInfoList(List<AuthInfo> authInfoList) { - this.authInfoList = authInfoList; + configuration.setAuthInfoList(authInfoList); } public long getMaxCloseWait() { - return maxCloseWait; + return configuration.getMaxCloseWait(); } public void setMaxCloseWait(long maxCloseWait) { - this.maxCloseWait = maxCloseWait; + configuration.setMaxCloseWait(maxCloseWait); } public TimeUnit getMaxCloseWaitUnit() { - return maxCloseWaitUnit; + return configuration.getMaxCloseWaitUnit(); } public void setMaxCloseWaitUnit(TimeUnit maxCloseWaitUnit) { - this.maxCloseWaitUnit = maxCloseWaitUnit; + configuration.setMaxCloseWaitUnit(maxCloseWaitUnit); } - public boolean isCloseOnStop() { - return closeOnStop; + public RetryPolicy getRetryPolicy() { + return configuration.getRetryPolicy(); } - public void setCloseOnStop(boolean closeOnStop) { - this.closeOnStop = closeOnStop; + public void setRetryPolicy(RetryPolicy retryPolicy) { + configuration.setRetryPolicy(retryPolicy); } - public RetryPolicy getRetryPolicy() { - return retryPolicy; + public String getBasePath() { + return configuration.getBasePath(); } - public void setRetryPolicy(RetryPolicy retryPolicy) { - this.retryPolicy = retryPolicy; + public void setBasePath(String basePath) { + configuration.setBasePath(basePath); } // ********************************************* @@ -223,13 +200,18 @@ public class ZooKeeperClusterService extends AbstractCamelClusterService<ZooKeep @Override protected ZooKeeperClusterView createView(String namespace) throws Exception { - return new ZooKeeperClusterView(this, getOrCreateClient(), namespace); + + // Validation + ObjectHelper.notNull(getCamelContext(), "Camel Context"); + ObjectHelper.notNull(configuration.getBasePath(), "ZooKeeper base path"); + + return new ZooKeeperClusterView(this, configuration, getOrCreateCurator(), namespace); } @Override protected void doStart() throws Exception { // instantiate a new CuratorFramework - getOrCreateClient(); + getOrCreateCurator(); super.doStart(); } @@ -238,42 +220,30 @@ public class ZooKeeperClusterService extends AbstractCamelClusterService<ZooKeep protected void doStop() throws Exception { super.doStop(); - if (client != null && closeOnStop) { - client.close(); + if (curator != null && managedInstance) { + curator.close(); } } - private CuratorFramework getOrCreateClient() throws Exception { - if (client == null) { - // Validate parameters - ObjectHelper.notNull(getCamelContext(), "Camel Context"); - ObjectHelper.notNull(nodes, "ZooKeeper Nodes"); - - CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() - .connectString(String.join(",", nodes)) - .sessionTimeoutMs((int)sessionTimeoutUnit.toMillis(sessionTimeout)) - .connectionTimeoutMs((int)connectionTimeotUnit.toMillis(connectionTimeout)) - .maxCloseWaitMs((int)maxCloseWaitUnit.toMillis(maxCloseWait)) - .retryPolicy(retryPolicy()); + private CuratorFramework getOrCreateCurator() throws Exception { + if (curator == null) { + curator = configuration.getCuratorFramework(); - Optional.ofNullable(namespace).ifPresent(builder::namespace); - Optional.ofNullable(authInfoList).ifPresent(builder::authorization); + if (curator == null) { + managedInstance = true; - LOGGER.debug("Connect to ZooKeeper with namespace {}, nodes: {}", namespace, nodes); - client = builder.build(); + LOGGER.debug("Starting ZooKeeper Curator with namespace '{}', nodes: '{}'", + configuration.getNamespace(), + String.join(",", configuration.getNodes()) + ); - LOGGER.debug("Starting ZooKeeper client"); - client.start(); + curator = ZooKeeperCuratorHelper.createCurator(configuration); + curator.start(); + } else { + managedInstance = false; + } } - return this.client; - } - - private RetryPolicy retryPolicy() { - return retryPolicy != null - ? retryPolicy - : new ExponentialBackoffRetry( - (int)reconnectBaseSleepTimeUnit.toMillis(reconnectBaseSleepTime), - reconnectMaxRetries); + return curator; } } http://git-wip-us.apache.org/repos/asf/camel/blob/e6a7caec/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java index 86e530f..acb1ad8 100644 --- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java +++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import org.apache.camel.RuntimeCamelException; +import org.apache.camel.component.zookeeper.ZooKeeperCuratorConfiguration; import org.apache.camel.ha.CamelClusterMember; import org.apache.camel.ha.CamelClusterService; import org.apache.camel.impl.ha.AbstractCamelClusterView; @@ -36,14 +37,16 @@ import org.slf4j.LoggerFactory; final class ZooKeeperClusterView extends AbstractCamelClusterView { private static final Logger LOGGER = LoggerFactory.getLogger(ZooKeeperClusterView.class); + private final ZooKeeperCuratorConfiguration configuration; private final CuratorFramework client; private final CuratorLocalMember localMember; private LeaderSelector leaderSelector; - public ZooKeeperClusterView(CamelClusterService cluster, CuratorFramework client, String namespace) { + public ZooKeeperClusterView(CamelClusterService cluster, ZooKeeperCuratorConfiguration configuration, CuratorFramework client, String namespace) { super(cluster, namespace); this.localMember = new CuratorLocalMember(); + this.configuration = configuration; this.client = client; } @@ -84,7 +87,7 @@ final class ZooKeeperClusterView extends AbstractCamelClusterView { @Override protected void doStart() throws Exception { if (leaderSelector == null) { - leaderSelector = new LeaderSelector(client, getNamespace(), new CamelLeaderElectionListener()); + leaderSelector = new LeaderSelector(client, configuration.getBasePath(), new CamelLeaderElectionListener()); leaderSelector.setId(getClusterService().getId()); leaderSelector.start(); } http://git-wip-us.apache.org/repos/asf/camel/blob/e6a7caec/components/camel-zookeeper/src/main/resources/META-INF/services/org/apache/camel/cloud/zookeeper-service-discovery ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/main/resources/META-INF/services/org/apache/camel/cloud/zookeeper-service-discovery b/components/camel-zookeeper/src/main/resources/META-INF/services/org/apache/camel/cloud/zookeeper-service-discovery new file mode 100644 index 0000000..6d8290de --- /dev/null +++ b/components/camel-zookeeper/src/main/resources/META-INF/services/org/apache/camel/cloud/zookeeper-service-discovery @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +class=org.apache.camel.component.zookeeper.cloud.ZooKeeperServiceDiscoveryFactory http://git-wip-us.apache.org/repos/asf/camel/blob/e6a7caec/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/cloud/SpringZooKeeperServiceCallRouteTest.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/cloud/SpringZooKeeperServiceCallRouteTest.java b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/cloud/SpringZooKeeperServiceCallRouteTest.java new file mode 100644 index 0000000..6f3ba10 --- /dev/null +++ b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/cloud/SpringZooKeeperServiceCallRouteTest.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.zookeeper.cloud; + +import org.apache.camel.component.zookeeper.ZooKeeperTestSupport; +import org.apache.camel.test.spring.CamelSpringTestSupport; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.utils.CloseableUtils; +import org.apache.curator.x.discovery.ServiceDiscovery; +import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; +import org.apache.curator.x.discovery.ServiceInstance; +import org.apache.curator.x.discovery.details.JsonInstanceSerializer; +import org.junit.Test; +import org.springframework.context.support.AbstractApplicationContext; +import org.springframework.context.support.ClassPathXmlApplicationContext; + +public class SpringZooKeeperServiceCallRouteTest extends CamelSpringTestSupport { + private static final int SERVER_PORT = 9001; + private static final String SERVICE_NAME = "http-service"; + private static final String SERVICE_PATH = "/camel"; + + private ZooKeeperTestSupport.TestZookeeperServer server; + private CuratorFramework curator; + private ServiceDiscovery<ZooKeeperServiceDiscovery.MetaData> discovery; + + // *********************** + // Setup / tear down + // *********************** + + @Override + public void doPreSetup() throws Exception { + super.doPreSetup(); + + server = new ZooKeeperTestSupport.TestZookeeperServer(SERVER_PORT, true); + ZooKeeperTestSupport.waitForServerUp("127.0.0.1:" + SERVER_PORT, 1000); + + curator = CuratorFrameworkFactory.builder() + .connectString("127.0.0.1:" + SERVER_PORT) + .retryPolicy(new ExponentialBackoffRetry(1000, 3)) + .build(); + + discovery = ServiceDiscoveryBuilder.builder(ZooKeeperServiceDiscovery.MetaData.class) + .client(curator) + .basePath(SERVICE_PATH) + .serializer(new JsonInstanceSerializer<>(ZooKeeperServiceDiscovery.MetaData.class)) + .build(); + + curator.start(); + discovery.start(); + + discovery.registerService( + ServiceInstance.<ZooKeeperServiceDiscovery.MetaData>builder() + .address("127.0.0.1") + .port(9011) + .name(SERVICE_NAME) + .id("service-1") + .build()); + + discovery.registerService( + ServiceInstance.<ZooKeeperServiceDiscovery.MetaData>builder() + .address("127.0.0.1") + .port(9012) + .name(SERVICE_NAME) + .id("service-2") + .build()); + + discovery.registerService( + ServiceInstance.<ZooKeeperServiceDiscovery.MetaData>builder() + .address("127.0.0.1") + .port(9013) + .name(SERVICE_NAME) + .id("service-3") + .build()); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + + CloseableUtils.closeQuietly(discovery); + CloseableUtils.closeQuietly(curator); + + if (server != null) { + server.shutdown(); + } + } + + // *********************** + // Test + // *********************** + + @Test + public void testServiceCall() throws Exception { + getMockEndpoint("mock:result").expectedMessageCount(3); + getMockEndpoint("mock:result").expectedBodiesReceivedInAnyOrder("ping 9011", "ping 9012", "ping 9013"); + + template.sendBody("direct:start", "ping"); + template.sendBody("direct:start", "ping"); + template.sendBody("direct:start", "ping"); + + assertMockEndpointsSatisfied(); + } + + // *********************** + // Routes + // *********************** + + @Override + protected AbstractApplicationContext createApplicationContext() { + return new ClassPathXmlApplicationContext("org/apache/camel/component/zookeeper/cloud/SpringZooKeeperServiceCallRouteTest.xml"); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/e6a7caec/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/cloud/ZooKeeperServiceCallRouteTest.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/cloud/ZooKeeperServiceCallRouteTest.java b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/cloud/ZooKeeperServiceCallRouteTest.java new file mode 100644 index 0000000..7e244a4 --- /dev/null +++ b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/cloud/ZooKeeperServiceCallRouteTest.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.zookeeper.cloud; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.zookeeper.ZooKeeperTestSupport; +import org.apache.camel.test.AvailablePortFinder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.utils.CloseableUtils; +import org.apache.curator.x.discovery.ServiceDiscovery; +import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; +import org.apache.curator.x.discovery.ServiceInstance; +import org.apache.curator.x.discovery.details.JsonInstanceSerializer; +import org.junit.Test; + +public class ZooKeeperServiceCallRouteTest extends CamelTestSupport { + private static final int SERVER_PORT = AvailablePortFinder.getNextAvailable(); + private static final String SERVICE_NAME = "http-service"; + private static final int SERVICE_COUNT = 5; + private static final String SERVICE_PATH = "/camel"; + + private ZooKeeperTestSupport.TestZookeeperServer server; + private CuratorFramework curator; + private ServiceDiscovery<ZooKeeperServiceDiscovery.MetaData> discovery; + private List<ServiceInstance<ZooKeeperServiceDiscovery.MetaData>> instances; + private List<String> expectedBodies; + + // ************************************************************************* + // Setup / tear down + // ************************************************************************* + + @Override + protected void doPreSetup() throws Exception { + super.doPreSetup(); + + server = new ZooKeeperTestSupport.TestZookeeperServer(SERVER_PORT, true); + ZooKeeperTestSupport.waitForServerUp("127.0.0.1:" + SERVER_PORT, 1000); + + curator = CuratorFrameworkFactory.builder() + .connectString("127.0.0.1:" + SERVER_PORT) + .retryPolicy(new ExponentialBackoffRetry(1000, 3)) + .build(); + + discovery = ServiceDiscoveryBuilder.builder(ZooKeeperServiceDiscovery.MetaData.class) + .client(curator) + .basePath(SERVICE_PATH) + .serializer(new JsonInstanceSerializer<>(ZooKeeperServiceDiscovery.MetaData.class)) + .build(); + + curator.start(); + discovery.start(); + + instances = new ArrayList<>(SERVICE_COUNT); + expectedBodies = new ArrayList<>(SERVICE_COUNT); + + for (int i = 0; i < SERVICE_COUNT; i++) { + ServiceInstance<ZooKeeperServiceDiscovery.MetaData> instance = ServiceInstance.<ZooKeeperServiceDiscovery.MetaData>builder() + .address("127.0.0.1") + .port(AvailablePortFinder.getNextAvailable()) + .name(SERVICE_NAME) + .id("service-" + i) + .build(); + + discovery.registerService(instance); + instances.add(instance); + expectedBodies.add("ping on " + instance.getPort()); + } + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + + for (ServiceInstance<ZooKeeperServiceDiscovery.MetaData> instace : instances) { + try { + discovery.unregisterService(instace); + } catch (Exception e) { + // Ignore + } + } + + CloseableUtils.closeQuietly(discovery); + CloseableUtils.closeQuietly(curator); + + server.shutdown(); + } + + // ************************************************************************* + // Test + // ************************************************************************* + + @Test + public void testServiceCall() throws Exception { + getMockEndpoint("mock:result").expectedMessageCount(SERVICE_COUNT); + getMockEndpoint("mock:result").expectedBodiesReceivedInAnyOrder(expectedBodies); + + instances.forEach(r -> template.sendBody("direct:start", "ping")); + + assertMockEndpointsSatisfied(); + } + + // ************************************************************************* + // Route + // ************************************************************************* + + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .serviceCall() + .name(SERVICE_NAME) + .component("jetty") + .defaultLoadBalancer() + .zookeeperServiceDiscovery("127.0.0.1:" + SERVER_PORT, SERVICE_PATH) + .end() + .to("log:org.apache.camel.component.zookeeper.cloud?level=INFO&showAll=true&multiline=true") + .to("mock:result"); + + instances.forEach(r -> + fromF("jetty:http://%s:%d", r.getAddress(), r.getPort()) + .transform().simple("${in.body} on " + r.getPort()) + ); + } + }; + } +} + http://git-wip-us.apache.org/repos/asf/camel/blob/e6a7caec/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/cloud/ZooKeeperServiceDiscoveryTest.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/cloud/ZooKeeperServiceDiscoveryTest.java b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/cloud/ZooKeeperServiceDiscoveryTest.java new file mode 100644 index 0000000..e6d5551 --- /dev/null +++ b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/cloud/ZooKeeperServiceDiscoveryTest.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.zookeeper.cloud; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.camel.cloud.ServiceDefinition; +import org.apache.camel.component.zookeeper.ZooKeeperCuratorConfiguration; +import org.apache.camel.component.zookeeper.ZooKeeperCuratorHelper; +import org.apache.camel.component.zookeeper.ZooKeeperTestSupport; +import org.apache.camel.test.AvailablePortFinder; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.utils.CloseableUtils; +import org.apache.curator.x.discovery.ServiceDiscovery; +import org.apache.curator.x.discovery.ServiceInstance; +import org.junit.Assert; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class ZooKeeperServiceDiscoveryTest { + + @Test + public void testServiceDiscovery() throws Exception { + ZooKeeperCuratorConfiguration configuration = new ZooKeeperCuratorConfiguration(); + ServiceDiscovery<ZooKeeperServiceDiscovery.MetaData> zkDiscovery = null; + ZooKeeperTestSupport.TestZookeeperServer server = null; + int port = AvailablePortFinder.getNextAvailable(); + + try { + server = new ZooKeeperTestSupport.TestZookeeperServer(port, true); + ZooKeeperTestSupport.waitForServerUp("localhost:" + port, 1000); + + configuration.setBasePath("/camel"); + configuration.setCuratorFramework(CuratorFrameworkFactory.builder() + .connectString("localhost:" + port) + .retryPolicy(new ExponentialBackoffRetry(1000, 3)) + .build() + ); + + zkDiscovery = ZooKeeperCuratorHelper.createServiceDiscovery( + configuration, + configuration.getCuratorFramework(), + ZooKeeperServiceDiscovery.MetaData.class + ); + + configuration.getCuratorFramework().start(); + zkDiscovery.start(); + + List<ServiceInstance<ZooKeeperServiceDiscovery.MetaData>> instances = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + ServiceInstance<ZooKeeperServiceDiscovery.MetaData> instance = ServiceInstance.<ZooKeeperServiceDiscovery.MetaData>builder() + .address("127.0.0.1") + .port(AvailablePortFinder.getNextAvailable()) + .name("my-service") + .id("service-" + i) + .build(); + + zkDiscovery.registerService(instance); + instances.add(instance); + } + + ZooKeeperServiceDiscovery discovery = new ZooKeeperServiceDiscovery(configuration); + discovery.start(); + + List<ServiceDefinition> services = discovery.getServices("my-service"); + assertNotNull(services); + assertEquals(3, services.size()); + + for (ServiceDefinition service : services) { + Assert.assertEquals( + 1, + instances.stream() + .filter( + i -> { + return i.getPort() == service.getPort() + && i.getAddress().equals(service.getHost()) + && i.getId().equals(service.getMetadata().get("service_id")) + && i.getName().equals(service.getName()); + } + ).count() + ); + } + + } finally { + CloseableUtils.closeQuietly(zkDiscovery); + CloseableUtils.closeQuietly(configuration.getCuratorFramework()); + + if (server != null) { + server.shutdown(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/e6a7caec/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyTest.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyTest.java b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyTest.java index d73648b..1c32787 100644 --- a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyTest.java +++ b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusteredRoutePolicyTest.java @@ -84,13 +84,13 @@ public final class ZooKeeperClusteredRoutePolicyTest { ZooKeeperClusterService service = new ZooKeeperClusterService(); service.setId("node-" + id); service.setNodes("localhost:" + PORT); - service.setNamespace(null); + service.setBasePath("/camel"); DefaultCamelContext context = new DefaultCamelContext(); context.disableJMX(); context.setName("context-" + id); context.addService(service); - context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("/my-ns")); + context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns")); context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { http://git-wip-us.apache.org/repos/asf/camel/blob/e6a7caec/components/camel-zookeeper/src/test/resources/org/apache/camel/component/zookeeper/cloud/SpringZooKeeperServiceCallRouteTest.xml ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/test/resources/org/apache/camel/component/zookeeper/cloud/SpringZooKeeperServiceCallRouteTest.xml b/components/camel-zookeeper/src/test/resources/org/apache/camel/component/zookeeper/cloud/SpringZooKeeperServiceCallRouteTest.xml new file mode 100644 index 0000000..482ed76 --- /dev/null +++ b/components/camel-zookeeper/src/test/resources/org/apache/camel/component/zookeeper/cloud/SpringZooKeeperServiceCallRouteTest.xml @@ -0,0 +1,65 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans + http://www.springframework.org/schema/beans/spring-beans.xsd + http://camel.apache.org/schema/spring + http://camel.apache.org/schema/spring/camel-spring.xsd"> + + <camelContext xmlns="http://camel.apache.org/schema/spring"> + + <!-- ************************************* --> + <!-- Routes --> + <!-- ************************************* --> + + <route id="scall"> + <from uri="direct:start"/> + <serviceCall name="http-service" component="jetty"> + <zookeeperServiceDiscovery nodes="localhost:9001" basePath="/camel"/> + </serviceCall> + <to uri="mock:result"/> + </route> + + <route> + <from uri="jetty:http://localhost:9011"/> + <transform> + <simple>${body} 9011</simple> + </transform> + </route> + + <route> + <from uri="jetty:http://localhost:9012"/> + <transform> + <simple>${body} 9012</simple> + </transform> + </route> + + <route> + <from uri="jetty:http://localhost:9013"/> + <transform> + <simple>${body} 9013</simple> + </transform> + </route> + + </camelContext> + +</beans> http://git-wip-us.apache.org/repos/asf/camel/blob/e6a7caec/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index fa39932..a322298 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -167,7 +167,7 @@ <cobertura-maven-plugin-version>2.7</cobertura-maven-plugin-version> <couchbase-client-version>1.4.13</couchbase-client-version> <couchbase-client-bundle-version>1.4.13_1</couchbase-client-bundle-version> - <curator-version>2.11.1</curator-version> + <curator-version>2.12.0</curator-version> <cxf-version>3.1.11</cxf-version> <cxf-version-range>[3.0,4.0)</cxf-version-range> <cxf-xjc-plugin-version>3.0.5</cxf-xjc-plugin-version> http://git-wip-us.apache.org/repos/asf/camel/blob/e6a7caec/platforms/karaf/features/src/main/resources/features.xml ---------------------------------------------------------------------- diff --git a/platforms/karaf/features/src/main/resources/features.xml b/platforms/karaf/features/src/main/resources/features.xml index 82e57c0..b976d97 100644 --- a/platforms/karaf/features/src/main/resources/features.xml +++ b/platforms/karaf/features/src/main/resources/features.xml @@ -2197,7 +2197,10 @@ <bundle dependency='true'>mvn:org.apache.curator/curator-framework/${curator-version}</bundle> <bundle dependency='true'>mvn:org.apache.curator/curator-client/${curator-version}</bundle> <bundle dependency='true'>mvn:org.apache.curator/curator-recipes/${curator-version}</bundle> + <bundle dependency='true'>mvn:org.apache.curator/curator-x-discovery/${curator-version}</bundle> <bundle dependency='true'>mvn:com.google.guava/guava/${zookeeper-guava-version}</bundle> + <bundle dependency='true'>mvn:org.codehaus.jackson/jackson-core-asl/${jackson-version}</bundle> + <bundle dependency='true'>mvn:org.codehaus.jackson/jackson-mapper-asl/${jackson-version}</bundle> <bundle>mvn:org.apache.camel/camel-zookeeper/${project.version}</bundle> </feature> <feature name='camel-zookeeper-master' version='${project.version}' resolver='(obr)' start-level='50'>