http://git-wip-us.apache.org/repos/asf/camel/blob/a811f400/components/camel-dns/src/main/java/org/apache/camel/component/dns/cloud/DnsServiceDiscoveryFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-dns/src/main/java/org/apache/camel/component/dns/cloud/DnsServiceDiscoveryFactory.java b/components/camel-dns/src/main/java/org/apache/camel/component/dns/cloud/DnsServiceDiscoveryFactory.java new file mode 100644 index 0000000..18568d7 --- /dev/null +++ b/components/camel-dns/src/main/java/org/apache/camel/component/dns/cloud/DnsServiceDiscoveryFactory.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.dns.cloud; + +import org.apache.camel.CamelContext; +import org.apache.camel.cloud.ServiceDiscovery; +import org.apache.camel.cloud.ServiceDiscoveryFactory; +import org.apache.camel.component.dns.DnsConfiguration; + +public class DnsServiceDiscoveryFactory implements ServiceDiscoveryFactory { + private final DnsConfiguration configuration; + + public DnsServiceDiscoveryFactory() { + this.configuration = new DnsConfiguration(); + } + + // ************************************************************************* + // Properties + // ************************************************************************* + + public String getProto() { + return configuration.getProto(); + } + + public void setProto(String proto) { + configuration.setProto(proto); + } + + public String getDomain() { + return configuration.getDomain(); + } + + public void setDomain(String domain) { + configuration.setDomain(domain); + } + + // ************************************************************************* + // Factory + // ************************************************************************* + + @Override + public ServiceDiscovery newInstance(CamelContext camelContext) throws Exception { + return new DnsServiceDiscovery(configuration); + } +}
http://git-wip-us.apache.org/repos/asf/camel/blob/a811f400/components/camel-dns/src/main/java/org/apache/camel/component/dns/processor/remote/DnsServiceCallProcessor.java ---------------------------------------------------------------------- diff --git a/components/camel-dns/src/main/java/org/apache/camel/component/dns/processor/remote/DnsServiceCallProcessor.java b/components/camel-dns/src/main/java/org/apache/camel/component/dns/processor/remote/DnsServiceCallProcessor.java deleted file mode 100644 index f469428..0000000 --- a/components/camel-dns/src/main/java/org/apache/camel/component/dns/processor/remote/DnsServiceCallProcessor.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.camel.component.dns.processor.remote; - -import org.apache.camel.ExchangePattern; -import org.apache.camel.component.dns.DnsConfiguration; -import org.apache.camel.impl.remote.DefaultServiceCallProcessor; -import org.apache.camel.spi.ServiceCallServer; -import org.apache.camel.spi.ServiceCallServerListStrategy; - -/** - * @author lburgazzoli - */ -public class DnsServiceCallProcessor extends DefaultServiceCallProcessor<ServiceCallServer> { - public DnsServiceCallProcessor(String name, String scheme, String uri, ExchangePattern exchangePattern, DnsConfiguration conf) { - super(name, scheme, uri, exchangePattern); - } - - @Override - public void setServerListStrategy(ServiceCallServerListStrategy<ServiceCallServer> serverListStrategy) { - if (!(serverListStrategy instanceof DnsServiceCallServerListStrategy)) { - throw new IllegalArgumentException("ServerListStrategy is not an instance of DnsServiceCallServerListStrategy"); - } - - super.setServerListStrategy(serverListStrategy); - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/a811f400/components/camel-dns/src/main/java/org/apache/camel/component/dns/processor/remote/DnsServiceCallProcessorFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-dns/src/main/java/org/apache/camel/component/dns/processor/remote/DnsServiceCallProcessorFactory.java b/components/camel-dns/src/main/java/org/apache/camel/component/dns/processor/remote/DnsServiceCallProcessorFactory.java deleted file mode 100644 index 6c285a6..0000000 --- a/components/camel-dns/src/main/java/org/apache/camel/component/dns/processor/remote/DnsServiceCallProcessorFactory.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.camel.component.dns.processor.remote; - -import java.util.Map; -import java.util.Optional; - -import org.apache.camel.ExchangePattern; -import org.apache.camel.component.dns.DnsConfiguration; -import org.apache.camel.impl.remote.DefaultServiceCallProcessor; -import org.apache.camel.impl.remote.DefaultServiceCallProcessorFactory; -import org.apache.camel.spi.ProcessorFactory; -import org.apache.camel.spi.RouteContext; -import org.apache.camel.spi.ServiceCallServer; -import org.apache.camel.spi.ServiceCallServerListStrategy; -import org.apache.camel.util.ObjectHelper; - -/** - * {@link ProcessorFactory} that creates the Etcd implementation of the ServiceCall EIP. - */ -public class DnsServiceCallProcessorFactory extends DefaultServiceCallProcessorFactory<DnsConfiguration, ServiceCallServer> { - @Override - protected DnsConfiguration createConfiguration(RouteContext routeContext) throws Exception { - return new DnsConfiguration(); - } - - @Override - protected DefaultServiceCallProcessor createProcessor( - String name, - String component, - String uri, - ExchangePattern mep, - DnsConfiguration conf, - Map<String, String> properties) throws Exception { - - return new DnsServiceCallProcessor(name, component, uri, mep, conf); - } - - @Override - protected Optional<ServiceCallServerListStrategy> builtInServerListStrategy(DnsConfiguration conf, String name) throws Exception { - ServiceCallServerListStrategy strategy = null; - if (ObjectHelper.equal("ondemand", name, true)) { - strategy = new DnsServiceCallServerListStrategies.OnDemand(conf); - } - - return Optional.ofNullable(strategy); - } - - @Override - protected ServiceCallServerListStrategy<ServiceCallServer> createDefaultServerListStrategy(DnsConfiguration conf) throws Exception { - return new DnsServiceCallServerListStrategies.OnDemand(conf); - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/a811f400/components/camel-dns/src/main/java/org/apache/camel/component/dns/processor/remote/DnsServiceCallServer.java ---------------------------------------------------------------------- diff --git a/components/camel-dns/src/main/java/org/apache/camel/component/dns/processor/remote/DnsServiceCallServer.java b/components/camel-dns/src/main/java/org/apache/camel/component/dns/processor/remote/DnsServiceCallServer.java deleted file mode 100644 index 77b7082..0000000 --- a/components/camel-dns/src/main/java/org/apache/camel/component/dns/processor/remote/DnsServiceCallServer.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.dns.processor.remote; - -import java.util.Comparator; -import java.util.HashMap; -import java.util.Map; - -import org.apache.camel.impl.remote.DefaultServiceCallServer; -import org.xbill.DNS.SRVRecord; - -import static org.apache.camel.util.ObjectHelper.ifNotEmpty; - -public class DnsServiceCallServer extends DefaultServiceCallServer { - public static final Comparator<SRVRecord> COMPARATOR = comparator(); - - public DnsServiceCallServer(SRVRecord record) { - super( - record.getTarget().toString(true), - record.getPort(), - getRecordMetaData(record) - ); - } - - public static Comparator<SRVRecord> comparator() { - Comparator<SRVRecord> byPriority = (e1, e2) -> Integer.compare(e2.getPriority(), e1.getPriority()); - Comparator<SRVRecord> byWeight = (e1, e2) -> Integer.compare(e2.getWeight(), e1.getWeight()); - - return byPriority.thenComparing(byWeight); - } - - public static Map<String, String> getRecordMetaData(SRVRecord record) { - Map<String, String> meta = new HashMap<>(); - ifNotEmpty(record.getPriority(), val -> meta.put("priority", Integer.toString(val))); - ifNotEmpty(record.getWeight(), val -> meta.put("weight", Integer.toString(val))); - - return meta; - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/a811f400/components/camel-dns/src/main/java/org/apache/camel/component/dns/processor/remote/DnsServiceCallServerListStrategies.java ---------------------------------------------------------------------- diff --git a/components/camel-dns/src/main/java/org/apache/camel/component/dns/processor/remote/DnsServiceCallServerListStrategies.java b/components/camel-dns/src/main/java/org/apache/camel/component/dns/processor/remote/DnsServiceCallServerListStrategies.java deleted file mode 100644 index e69ff17..0000000 --- a/components/camel-dns/src/main/java/org/apache/camel/component/dns/processor/remote/DnsServiceCallServerListStrategies.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.dns.processor.remote; - -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Objects; -import java.util.stream.Collectors; - -import org.apache.camel.component.dns.DnsConfiguration; -import org.apache.camel.spi.ServiceCallServer; -import org.xbill.DNS.Lookup; -import org.xbill.DNS.Record; -import org.xbill.DNS.SRVRecord; - - -public final class DnsServiceCallServerListStrategies { - private DnsServiceCallServerListStrategies() { - } - - public static final class OnDemand extends DnsServiceCallServerListStrategy { - private final DnsServiceLookupFactory lookupFactory; - - public OnDemand(DnsConfiguration configuration) throws Exception { - super(configuration); - this.lookupFactory = new DnsServiceLookupFactory(configuration); - } - - @Override - public List<ServiceCallServer> getUpdatedListOfServers(String name) { - final Lookup lookup = lookupFactory.apply(name); - final Record[] records = lookup.run(); - - List<ServiceCallServer> servers; - if (Objects.nonNull(records) && lookup.getResult() == Lookup.SUCCESSFUL) { - servers = Arrays.stream(records) - .filter(SRVRecord.class::isInstance) - .map(SRVRecord.class::cast) - .sorted(DnsServiceCallServer.COMPARATOR) - .map(DnsServiceCallServer::new) - .collect(Collectors.toList()); - } else { - servers = Collections.emptyList(); - } - - return servers; - } - - @Override - public String toString() { - return "OnDemand"; - } - } - - // ************************************************************************* - // Helpers - // ************************************************************************* - - public static DnsServiceCallServerListStrategy onDemand(DnsConfiguration configuration) throws Exception { - return new OnDemand(configuration); - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/a811f400/components/camel-dns/src/main/java/org/apache/camel/component/dns/processor/remote/DnsServiceCallServerListStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-dns/src/main/java/org/apache/camel/component/dns/processor/remote/DnsServiceCallServerListStrategy.java b/components/camel-dns/src/main/java/org/apache/camel/component/dns/processor/remote/DnsServiceCallServerListStrategy.java deleted file mode 100644 index 85ae26a..0000000 --- a/components/camel-dns/src/main/java/org/apache/camel/component/dns/processor/remote/DnsServiceCallServerListStrategy.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.dns.processor.remote; - -import org.apache.camel.component.dns.DnsConfiguration; -import org.apache.camel.impl.remote.DefaultServiceCallServerListStrategy; -import org.apache.camel.spi.ServiceCallServer; - -public class DnsServiceCallServerListStrategy extends DefaultServiceCallServerListStrategy<ServiceCallServer> { - private final DnsConfiguration configuration; - - public DnsServiceCallServerListStrategy(DnsConfiguration configuration) { - this.configuration = configuration; - } - - protected DnsConfiguration getConfiguration() { - return configuration; - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/a811f400/components/camel-dns/src/main/java/org/apache/camel/component/dns/processor/remote/DnsServiceLookupFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-dns/src/main/java/org/apache/camel/component/dns/processor/remote/DnsServiceLookupFactory.java b/components/camel-dns/src/main/java/org/apache/camel/component/dns/processor/remote/DnsServiceLookupFactory.java deleted file mode 100644 index ce63c96..0000000 --- a/components/camel-dns/src/main/java/org/apache/camel/component/dns/processor/remote/DnsServiceLookupFactory.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.camel.component.dns.processor.remote; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Function; - -import org.apache.camel.RuntimeCamelException; -import org.apache.camel.component.dns.DnsConfiguration; -import org.xbill.DNS.Lookup; -import org.xbill.DNS.TextParseException; -import org.xbill.DNS.Type; - -public class DnsServiceLookupFactory implements Function<String, Lookup> { - private final DnsConfiguration configuration; - private final ConcurrentHashMap<String, Lookup> cache; - - public DnsServiceLookupFactory(DnsConfiguration configuration) { - this.configuration = configuration; - this.cache = new ConcurrentHashMap<>(); - } - - @Override - public Lookup apply(String name) { - return cache.computeIfAbsent(name, this::createLookup); - } - - private Lookup createLookup(String name) { - try { - return new Lookup( - String.format("%s.%s.%s", name, configuration.getProto(), configuration.getDomain()), - Type.SRV); - } catch (TextParseException e) { - throw new RuntimeCamelException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/a811f400/components/camel-dns/src/main/resources/META-INF/services/org/apache/camel/cloud/dns-service-discovery ---------------------------------------------------------------------- diff --git a/components/camel-dns/src/main/resources/META-INF/services/org/apache/camel/cloud/dns-service-discovery b/components/camel-dns/src/main/resources/META-INF/services/org/apache/camel/cloud/dns-service-discovery new file mode 100644 index 0000000..4c667da --- /dev/null +++ b/components/camel-dns/src/main/resources/META-INF/services/org/apache/camel/cloud/dns-service-discovery @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +class=org.apache.camel.component.dns.cloud.DnsServiceDiscoveryFactory http://git-wip-us.apache.org/repos/asf/camel/blob/a811f400/components/camel-dns/src/main/resources/META-INF/services/org/apache/camel/model/ServiceCallDefinition ---------------------------------------------------------------------- diff --git a/components/camel-dns/src/main/resources/META-INF/services/org/apache/camel/model/ServiceCallDefinition b/components/camel-dns/src/main/resources/META-INF/services/org/apache/camel/model/ServiceCallDefinition deleted file mode 100644 index d0bd19b..0000000 --- a/components/camel-dns/src/main/resources/META-INF/services/org/apache/camel/model/ServiceCallDefinition +++ /dev/null @@ -1,18 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -class=org.apache.camel.component.dns.processor.remote.DnsServiceCallProcessorFactory http://git-wip-us.apache.org/repos/asf/camel/blob/a811f400/components/camel-dns/src/test/java/org/apache/camel/component/dns/cloud/DnsServiceDiscoveryTest.java ---------------------------------------------------------------------- diff --git a/components/camel-dns/src/test/java/org/apache/camel/component/dns/cloud/DnsServiceDiscoveryTest.java b/components/camel-dns/src/test/java/org/apache/camel/component/dns/cloud/DnsServiceDiscoveryTest.java new file mode 100644 index 0000000..e204e95 --- /dev/null +++ b/components/camel-dns/src/test/java/org/apache/camel/component/dns/cloud/DnsServiceDiscoveryTest.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.dns.cloud; + +import java.util.List; + +import org.apache.camel.cloud.ServiceDefinition; +import org.apache.camel.component.dns.DnsConfiguration; +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; + +public class DnsServiceDiscoveryTest { + @Test + public void testServiceDiscovery() throws Exception { + DnsConfiguration configuration = new DnsConfiguration(); + DnsServiceDiscovery discovery = new DnsServiceDiscovery(configuration); + + configuration.setDomain("gmail.com"); + configuration.setProto("_tcp"); + + List<ServiceDefinition> services = discovery.getUpdatedListOfServices("_xmpp-server"); + assertNotNull(services); + assertFalse(services.isEmpty()); + + for (ServiceDefinition service : services) { + assertFalse(service.getMetadata().isEmpty()); + assertNotNull(service.getMetadata().get("priority")); + assertNotNull(service.getMetadata().get("weight")); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/a811f400/components/camel-dns/src/test/java/org/apache/camel/component/dns/processor/remote/DnsServiceCallServerListStrategiesTest.java ---------------------------------------------------------------------- diff --git a/components/camel-dns/src/test/java/org/apache/camel/component/dns/processor/remote/DnsServiceCallServerListStrategiesTest.java b/components/camel-dns/src/test/java/org/apache/camel/component/dns/processor/remote/DnsServiceCallServerListStrategiesTest.java deleted file mode 100644 index e13eab2..0000000 --- a/components/camel-dns/src/test/java/org/apache/camel/component/dns/processor/remote/DnsServiceCallServerListStrategiesTest.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.dns.processor.remote; - -import java.util.List; - -import org.apache.camel.component.dns.DnsConfiguration; -import org.apache.camel.spi.ServiceCallServer; -import org.junit.Test; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; - -public class DnsServiceCallServerListStrategiesTest { - @Test - public void testOnDemand() throws Exception { - DnsConfiguration configuration = new DnsConfiguration(); - DnsServiceCallServerListStrategy strategy = DnsServiceCallServerListStrategies.onDemand(configuration); - - configuration.setDomain("gmail.com"); - configuration.setProto("_tcp"); - - List<ServiceCallServer> servers = strategy.getUpdatedListOfServers("_xmpp-server"); - assertNotNull(servers); - assertFalse(servers.isEmpty()); - - for (ServiceCallServer server : servers) { - assertFalse(server.getMetadata().isEmpty()); - assertNotNull(server.getMetadata().get("priority")); - assertNotNull(server.getMetadata().get("weight")); - } - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/a811f400/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdConfiguration.java b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdConfiguration.java index aeb089a..4c97ceb 100644 --- a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdConfiguration.java +++ b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdConfiguration.java @@ -51,6 +51,10 @@ public class EtcdConfiguration { private final CamelContext camelContext; + public EtcdConfiguration() { + this.camelContext = null; + } + public EtcdConfiguration(CamelContext camelContext) { this.camelContext = camelContext; } http://git-wip-us.apache.org/repos/asf/camel/blob/a811f400/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/cloud/EtcdOnDemandServiceDiscovery.java ---------------------------------------------------------------------- diff --git a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/cloud/EtcdOnDemandServiceDiscovery.java b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/cloud/EtcdOnDemandServiceDiscovery.java new file mode 100644 index 0000000..6eb5e9f --- /dev/null +++ b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/cloud/EtcdOnDemandServiceDiscovery.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.etcd.cloud; + +import java.util.List; + +import org.apache.camel.cloud.ServiceDefinition; +import org.apache.camel.component.etcd.EtcdConfiguration; + +public class EtcdOnDemandServiceDiscovery extends EtcdServiceDiscovery { + public EtcdOnDemandServiceDiscovery(EtcdConfiguration configuration) throws Exception { + super(configuration); + } + + @Override + public List<ServiceDefinition> getUpdatedListOfServices(String name) { + return getServices(s -> name.equalsIgnoreCase(s.getName())); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/a811f400/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/cloud/EtcdServiceDefinition.java ---------------------------------------------------------------------- diff --git a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/cloud/EtcdServiceDefinition.java b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/cloud/EtcdServiceDefinition.java new file mode 100644 index 0000000..8a9d276 --- /dev/null +++ b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/cloud/EtcdServiceDefinition.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.etcd.cloud; + +import java.util.Comparator; +import java.util.Map; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.camel.impl.cloud.DefaultServiceDefinition; + +public class EtcdServiceDefinition extends DefaultServiceDefinition { + public static final Comparator<EtcdServiceDefinition> COMPARATOR = comparator(); + + @JsonCreator + public EtcdServiceDefinition( + @JsonProperty("name") final String name, + @JsonProperty("address") final String address, + @JsonProperty("port") final Integer port, + @JsonProperty("tags") final Map<String, String> tags) { + super(name, address, port, tags); + } + + public static Comparator<EtcdServiceDefinition> comparator() { + Comparator<EtcdServiceDefinition> byAddress = (e1, e2) -> e2.getHost().compareTo(e1.getHost()); + Comparator<EtcdServiceDefinition> byPort = (e1, e2) -> Integer.compare(e2.getPort(), e1.getPort()); + + return byAddress.thenComparing(byPort); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/a811f400/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/cloud/EtcdServiceDiscovery.java ---------------------------------------------------------------------- diff --git a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/cloud/EtcdServiceDiscovery.java b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/cloud/EtcdServiceDiscovery.java new file mode 100644 index 0000000..edb0c4c --- /dev/null +++ b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/cloud/EtcdServiceDiscovery.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.etcd.cloud; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import com.fasterxml.jackson.databind.ObjectMapper; +import mousio.etcd4j.EtcdClient; +import mousio.etcd4j.requests.EtcdKeyGetRequest; +import mousio.etcd4j.responses.EtcdKeysResponse; +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.cloud.ServiceDefinition; +import org.apache.camel.component.etcd.EtcdConfiguration; +import org.apache.camel.component.etcd.EtcdHelper; +import org.apache.camel.impl.cloud.DefaultServiceDiscovery; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +abstract class EtcdServiceDiscovery extends DefaultServiceDiscovery { + private static final Logger LOGGER = LoggerFactory.getLogger(EtcdServiceDiscovery.class); + private static final ObjectMapper MAPPER = EtcdHelper.createObjectMapper(); + + private final EtcdConfiguration configuration; + private EtcdClient client; + + EtcdServiceDiscovery(EtcdConfiguration configuration) { + this.configuration = configuration; + this.client = null; + } + + @Override + protected void doStart() throws Exception { + if (client == null) { + client = configuration.createClient(); + } + } + + @Override + protected void doStop() throws Exception { + if (client != null) { + client.close(); + client = null; + } + } + + protected EtcdConfiguration getConfiguration() { + return this.configuration; + } + + protected EtcdClient getClient() { + return this.client; + } + + protected EtcdServiceDefinition nodeFromString(String value) { + EtcdServiceDefinition server = null; + + try { + server = MAPPER.readValue(value, EtcdServiceDefinition.class); + } catch (Exception e) { + LOGGER.warn("", e); + } + + return server; + } + + protected List<ServiceDefinition> getServices() { + return getServices(s -> true); + } + + protected List<ServiceDefinition> getServices(Predicate<EtcdServiceDefinition> filter) { + List<ServiceDefinition> servers = Collections.emptyList(); + + if (isRunAllowed()) { + try { + final EtcdConfiguration conf = getConfiguration(); + final EtcdKeyGetRequest request = getClient().get(conf.getServicePath()).recursive(); + if (conf.hasTimeout()) { + request.timeout(conf.getTimeout(), TimeUnit.SECONDS); + } + + final EtcdKeysResponse response = request.send().get(); + + if (Objects.nonNull(response.node) && !response.node.nodes.isEmpty()) { + servers = response.node.nodes.stream() + .map(node -> node.value) + .filter(ObjectHelper::isNotEmpty) + .map(this::nodeFromString) + .filter(Objects::nonNull) + .filter(filter) + .sorted(EtcdServiceDefinition.COMPARATOR) + .collect(Collectors.toList()); + } + } catch (Exception e) { + throw new RuntimeCamelException(e); + } + } + + return servers; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/a811f400/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/cloud/EtcdServiceDiscoveryFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/cloud/EtcdServiceDiscoveryFactory.java b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/cloud/EtcdServiceDiscoveryFactory.java new file mode 100644 index 0000000..5bf89e0 --- /dev/null +++ b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/cloud/EtcdServiceDiscoveryFactory.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.etcd.cloud; + +import org.apache.camel.CamelContext; +import org.apache.camel.cloud.ServiceDiscovery; +import org.apache.camel.cloud.ServiceDiscoveryFactory; +import org.apache.camel.component.etcd.EtcdConfiguration; +import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.jsse.SSLContextParameters; + +public class EtcdServiceDiscoveryFactory implements ServiceDiscoveryFactory { + private final EtcdConfiguration configuration; + private String type; + + public EtcdServiceDiscoveryFactory() { + this.configuration = new EtcdConfiguration(); + } + + // ************************************************************************* + // Properties + // ************************************************************************* + + public String getUris() { + return configuration.getUris(); + } + + public void setUris(String uris) { + configuration.setUris(uris); + } + + public SSLContextParameters getSslContextParameters() { + return configuration.getSslContextParameters(); + } + + public void setSslContextParameters(SSLContextParameters sslContextParameters) { + configuration.setSslContextParameters(sslContextParameters); + } + + public String getUserName() { + return configuration.getUserName(); + } + + public void setUserName(String userName) { + configuration.setUserName(userName); + } + + public String getPassword() { + return configuration.getPassword(); + } + + public void setPassword(String password) { + configuration.setPassword(password); + } + + public Integer getTimeToLive() { + return configuration.getTimeToLive(); + } + + public void setTimeToLive(Integer timeToLive) { + configuration.setTimeToLive(timeToLive); + } + + public Long getTimeout() { + return configuration.getTimeout(); + } + + public void setTimeout(Long timeout) { + configuration.setTimeout(timeout); + } + + public String getServicePath() { + return configuration.getServicePath(); + } + + public void setServicePath(String servicePath) { + configuration.setServicePath(servicePath); + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + // ************************************************************************* + // Factory + // ************************************************************************* + + @Override + public ServiceDiscovery newInstance(CamelContext camelContext) throws Exception { + return ObjectHelper.equal("watch", type, true) + ? new EtcdWatchServiceDiscovery(configuration) + : new EtcdOnDemandServiceDiscovery(configuration); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/a811f400/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/cloud/EtcdWatchServiceDiscovery.java ---------------------------------------------------------------------- diff --git a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/cloud/EtcdWatchServiceDiscovery.java b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/cloud/EtcdWatchServiceDiscovery.java new file mode 100644 index 0000000..587ee25 --- /dev/null +++ b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/cloud/EtcdWatchServiceDiscovery.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.etcd.cloud; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import mousio.client.promises.ResponsePromise; +import mousio.etcd4j.responses.EtcdException; +import mousio.etcd4j.responses.EtcdKeysResponse; +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.cloud.ServiceDefinition; +import org.apache.camel.component.etcd.EtcdConfiguration; +import org.apache.camel.component.etcd.EtcdHelper; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EtcdWatchServiceDiscovery + extends EtcdServiceDiscovery + implements ResponsePromise.IsSimplePromiseResponseHandler<EtcdKeysResponse> { + + private static final Logger LOGGER = LoggerFactory.getLogger(EtcdWatchServiceDiscovery.class); + private final AtomicReference<List<ServiceDefinition>> serversRef; + private final AtomicLong index; + private final String servicePath; + + public EtcdWatchServiceDiscovery(EtcdConfiguration configuration) throws Exception { + super(configuration); + + this.serversRef = new AtomicReference<>(); + this.index = new AtomicLong(0); + this.servicePath = ObjectHelper.notNull(configuration.getServicePath(), "servicePath"); + } + + @Override + public List<ServiceDefinition> getUpdatedListOfServices(String name) { + List<ServiceDefinition> servers = serversRef.get(); + if (servers == null) { + serversRef.set(getServices()); + watch(); + } + + return serversRef.get().stream() + .filter(s -> name.equalsIgnoreCase(s.getName())) + .collect(Collectors.toList()); + } + + // ************************************************************************* + // Watch + // ************************************************************************* + + @Override + public void onResponse(ResponsePromise<EtcdKeysResponse> promise) { + if (!isRunAllowed()) { + return; + } + + Throwable throwable = promise.getException(); + if (throwable != null && throwable instanceof EtcdException) { + EtcdException exception = (EtcdException) throwable; + if (EtcdHelper.isOutdatedIndexException(exception)) { + LOGGER.debug("Outdated index, key={}, cause={}", servicePath, exception.etcdCause); + index.set(exception.index + 1); + } + } else { + try { + EtcdKeysResponse response = promise.get(); + EtcdHelper.setIndex(index, response); + + serversRef.set(getServices()); + } catch (TimeoutException e) { + LOGGER.debug("Timeout watching for {}", getConfiguration().getServicePath()); + throwable = null; + } catch (Exception e) { + throwable = e; + } + } + + if (throwable == null) { + watch(); + } else { + throw new RuntimeCamelException(throwable); + } + } + + private void watch() { + if (!isRunAllowed()) { + return; + } + + try { + getClient().get(servicePath) + .recursive() + .waitForChange(index.get()) + .timeout(1, TimeUnit.SECONDS) + .send() + .addListener(this); + } catch (Exception e) { + throw new RuntimeCamelException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/a811f400/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallProcessor.java ---------------------------------------------------------------------- diff --git a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallProcessor.java b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallProcessor.java deleted file mode 100644 index f1200a2..0000000 --- a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallProcessor.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.camel.component.etcd.processor.remote; - -import org.apache.camel.ExchangePattern; -import org.apache.camel.component.etcd.EtcdConfiguration; -import org.apache.camel.impl.remote.DefaultServiceCallProcessor; -import org.apache.camel.spi.ProcessorFactory; -import org.apache.camel.spi.ServiceCallServerListStrategy; - -/** - * {@link ProcessorFactory} that creates the Etcd implementation of the ServiceCall EIP. - */ -public class EtcdServiceCallProcessor extends DefaultServiceCallProcessor<EtcdServiceCallServer> { - public EtcdServiceCallProcessor(String name, String scheme, String uri, ExchangePattern exchangePattern, EtcdConfiguration conf) { - super(name, scheme, uri, exchangePattern); - } - - @Override - public void setServerListStrategy(ServiceCallServerListStrategy<EtcdServiceCallServer> serverListStrategy) { - if (!(serverListStrategy instanceof EtcdServiceCallServerListStrategy)) { - throw new IllegalArgumentException("ServerListStrategy is not an instance of EtcdServiceCallServerListStrategy"); - } - - super.setServerListStrategy(serverListStrategy); - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/a811f400/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallProcessorFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallProcessorFactory.java b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallProcessorFactory.java deleted file mode 100644 index 370916e..0000000 --- a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallProcessorFactory.java +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.camel.component.etcd.processor.remote; - -import java.util.Map; -import java.util.Optional; - -import org.apache.camel.ExchangePattern; -import org.apache.camel.component.etcd.EtcdConfiguration; -import org.apache.camel.impl.remote.DefaultServiceCallProcessor; -import org.apache.camel.impl.remote.DefaultServiceCallProcessorFactory; -import org.apache.camel.spi.ProcessorFactory; -import org.apache.camel.spi.RouteContext; -import org.apache.camel.spi.ServiceCallServerListStrategy; -import org.apache.camel.util.ObjectHelper; - -/** - * {@link ProcessorFactory} that creates the Etcd implementation of the ServiceCall EIP. - */ -public class EtcdServiceCallProcessorFactory extends DefaultServiceCallProcessorFactory<EtcdConfiguration, EtcdServiceCallServer> { - @Override - protected EtcdConfiguration createConfiguration(RouteContext routeContext) throws Exception { - return new EtcdConfiguration(routeContext.getCamelContext()); - } - - @Override - protected DefaultServiceCallProcessor createProcessor( - String name, - String component, - String uri, - ExchangePattern mep, - EtcdConfiguration conf, - Map<String, String> properties) throws Exception { - - return new EtcdServiceCallProcessor(name, component, uri, mep, conf); - } - - @Override - protected Optional<ServiceCallServerListStrategy> builtInServerListStrategy(EtcdConfiguration conf, String name) throws Exception { - ServiceCallServerListStrategy strategy = null; - if (ObjectHelper.equal("ondemand", name, true)) { - strategy = new EtcdServiceCallServerListStrategies.OnDemand(conf); - } else if (ObjectHelper.equal("watch", name, true)) { - strategy = new EtcdServiceCallServerListStrategies.OnDemand(conf); - } - - return Optional.ofNullable(strategy); - } - - @Override - protected ServiceCallServerListStrategy<EtcdServiceCallServer> createDefaultServerListStrategy(EtcdConfiguration conf) throws Exception { - return new EtcdServiceCallServerListStrategies.OnDemand(conf); - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/a811f400/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServer.java ---------------------------------------------------------------------- diff --git a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServer.java b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServer.java deleted file mode 100644 index 648b544..0000000 --- a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServer.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.etcd.processor.remote; - -import java.util.Comparator; -import java.util.Map; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.camel.impl.remote.DefaultServiceCallServer; - -public class EtcdServiceCallServer extends DefaultServiceCallServer { - public static final Comparator<EtcdServiceCallServer> COMPARATOR = comparator(); - - private final String name; - - @JsonCreator - public EtcdServiceCallServer( - @JsonProperty("name") final String name, - @JsonProperty("address") final String address, - @JsonProperty("port") final Integer port, - @JsonProperty("tags") final Map<String, String> tags) { - super(address, port, tags); - - this.name = name; - } - - public String getName() { - return name; - } - - public static Comparator<EtcdServiceCallServer> comparator() { - Comparator<EtcdServiceCallServer> byAddress = (e1, e2) -> e2.getIp().compareTo(e1.getIp()); - Comparator<EtcdServiceCallServer> byPort = (e1, e2) -> Integer.compare(e2.getPort(), e1.getPort()); - - return byAddress.thenComparing(byPort); - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/a811f400/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServerListStrategies.java ---------------------------------------------------------------------- diff --git a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServerListStrategies.java b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServerListStrategies.java deleted file mode 100644 index d65ced9..0000000 --- a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServerListStrategies.java +++ /dev/null @@ -1,203 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.etcd.processor.remote; - -import java.util.Collections; -import java.util.List; -import java.util.Objects; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Predicate; -import java.util.stream.Collectors; - -import mousio.client.promises.ResponsePromise; -import mousio.etcd4j.requests.EtcdKeyGetRequest; -import mousio.etcd4j.responses.EtcdException; -import mousio.etcd4j.responses.EtcdKeysResponse; -import org.apache.camel.RuntimeCamelException; -import org.apache.camel.component.etcd.EtcdConfiguration; -import org.apache.camel.component.etcd.EtcdHelper; -import org.apache.camel.util.ObjectHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public final class EtcdServiceCallServerListStrategies { - - private abstract static class AbstractStrategy extends EtcdServiceCallServerListStrategy { - AbstractStrategy(EtcdConfiguration configuration) throws Exception { - super(configuration); - } - - protected List<EtcdServiceCallServer> getServers() { - return getServers(s -> true); - } - - protected List<EtcdServiceCallServer> getServers(Predicate<EtcdServiceCallServer> filter) { - List<EtcdServiceCallServer> servers = Collections.emptyList(); - - if (isRunAllowed()) { - try { - final EtcdConfiguration conf = getConfiguration(); - final EtcdKeyGetRequest request = getClient().get(conf.getServicePath()).recursive(); - if (conf.hasTimeout()) { - request.timeout(conf.getTimeout(), TimeUnit.SECONDS); - } - - final EtcdKeysResponse response = request.send().get(); - - if (Objects.nonNull(response.node) && !response.node.nodes.isEmpty()) { - servers = response.node.nodes.stream() - .map(node -> node.value) - .filter(ObjectHelper::isNotEmpty) - .map(this::nodeFromString) - .filter(Objects::nonNull) - .filter(filter) - .sorted(EtcdServiceCallServer.COMPARATOR) - .collect(Collectors.toList()); - } - } catch (Exception e) { - throw new RuntimeCamelException(e); - } - } - - return servers; - } - } - - private EtcdServiceCallServerListStrategies() { - } - - public static final class OnDemand extends AbstractStrategy { - public OnDemand(EtcdConfiguration configuration) throws Exception { - super(configuration); - } - - @Override - public List<EtcdServiceCallServer> getUpdatedListOfServers(String name) { - return getServers(s -> name.equalsIgnoreCase(s.getName())); - } - - @Override - public String toString() { - return "EtcdServiceCallServerListStrategy.OnDemand"; - } - } - - public static final class Watch extends AbstractStrategy - implements ResponsePromise.IsSimplePromiseResponseHandler<EtcdKeysResponse> { - - private static final Logger LOGGER = LoggerFactory.getLogger(Watch.class); - private final AtomicReference<List<EtcdServiceCallServer>> serversRef; - private final AtomicLong index; - private final String servicePath; - - public Watch(EtcdConfiguration configuration) throws Exception { - super(configuration); - - this.serversRef = new AtomicReference<>(); - this.index = new AtomicLong(0); - this.servicePath = ObjectHelper.notNull(configuration.getServicePath(), "servicePath"); - } - - @Override - public List<EtcdServiceCallServer> getUpdatedListOfServers(String name) { - List<EtcdServiceCallServer> servers = serversRef.get(); - if (servers == null) { - serversRef.set(getServers()); - watch(); - } - - return serversRef.get().stream() - .filter(s -> name.equalsIgnoreCase(s.getName())) - .collect(Collectors.toList()); - } - - @Override - public String toString() { - return "EtcdServiceCallServerListStrategy.Watch"; - } - - // ************************************************************************* - // Watch - // ************************************************************************* - - @Override - public void onResponse(ResponsePromise<EtcdKeysResponse> promise) { - if (!isRunAllowed()) { - return; - } - - Throwable throwable = promise.getException(); - if (throwable != null && throwable instanceof EtcdException) { - EtcdException exception = (EtcdException) throwable; - if (EtcdHelper.isOutdatedIndexException(exception)) { - LOGGER.debug("Outdated index, key={}, cause={}", servicePath, exception.etcdCause); - index.set(exception.index + 1); - } - } else { - try { - EtcdKeysResponse response = promise.get(); - EtcdHelper.setIndex(index, response); - - serversRef.set(getServers()); - } catch (TimeoutException e) { - LOGGER.debug("Timeout watching for {}", getConfiguration().getServicePath()); - throwable = null; - } catch (Exception e) { - throwable = e; - } - } - - if (throwable == null) { - watch(); - } else { - throw new RuntimeCamelException(throwable); - } - } - - private void watch() { - if (!isRunAllowed()) { - return; - } - - try { - getClient().get(servicePath) - .recursive() - .waitForChange(index.get()) - .timeout(1, TimeUnit.SECONDS) - .send() - .addListener(this); - } catch (Exception e) { - throw new RuntimeCamelException(e); - } - } - } - - // ************************************************************************* - // Helpers - // ************************************************************************* - - public static EtcdServiceCallServerListStrategy onDemand(EtcdConfiguration configuration) throws Exception { - return new OnDemand(configuration); - } - - public static EtcdServiceCallServerListStrategy watch(EtcdConfiguration configuration) throws Exception { - return new Watch(configuration); - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/a811f400/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServerListStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServerListStrategy.java b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServerListStrategy.java deleted file mode 100644 index a91156a..0000000 --- a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServerListStrategy.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.etcd.processor.remote; - -import com.fasterxml.jackson.databind.ObjectMapper; -import mousio.etcd4j.EtcdClient; -import org.apache.camel.component.etcd.EtcdConfiguration; -import org.apache.camel.component.etcd.EtcdHelper; -import org.apache.camel.impl.remote.DefaultServiceCallServerListStrategy; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class EtcdServiceCallServerListStrategy extends DefaultServiceCallServerListStrategy<EtcdServiceCallServer> { - private static final Logger LOGGER = LoggerFactory.getLogger(EtcdServiceCallServerListStrategy.class); - private static final ObjectMapper MAPPER = EtcdHelper.createObjectMapper(); - - private final EtcdConfiguration configuration; - private EtcdClient client; - - public EtcdServiceCallServerListStrategy(EtcdConfiguration configuration) { - this.configuration = configuration; - this.client = null; - } - - @Override - protected void doStart() throws Exception { - if (client == null) { - client = configuration.createClient(); - } - } - - @Override - protected void doStop() throws Exception { - if (client != null) { - client.close(); - client = null; - } - } - - protected EtcdConfiguration getConfiguration() { - return this.configuration; - } - - protected EtcdClient getClient() { - return this.client; - } - - protected EtcdServiceCallServer nodeFromString(String value) { - EtcdServiceCallServer server = null; - - try { - server = MAPPER.readValue(value, EtcdServiceCallServer.class); - } catch (Exception e) { - LOGGER.warn("", e); - } - - return server; - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/a811f400/components/camel-etcd/src/main/resources/META-INF/services/org/apache/camel/cloud/etcd-service-discovery ---------------------------------------------------------------------- diff --git a/components/camel-etcd/src/main/resources/META-INF/services/org/apache/camel/cloud/etcd-service-discovery b/components/camel-etcd/src/main/resources/META-INF/services/org/apache/camel/cloud/etcd-service-discovery new file mode 100644 index 0000000..4d4505f --- /dev/null +++ b/components/camel-etcd/src/main/resources/META-INF/services/org/apache/camel/cloud/etcd-service-discovery @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +class=org.apache.camel.component.etcd.cloud.EtcdServiceDiscoveryFactory \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/a811f400/components/camel-etcd/src/main/resources/META-INF/services/org/apache/camel/model/ServiceCallDefinition ---------------------------------------------------------------------- diff --git a/components/camel-etcd/src/main/resources/META-INF/services/org/apache/camel/model/ServiceCallDefinition b/components/camel-etcd/src/main/resources/META-INF/services/org/apache/camel/model/ServiceCallDefinition deleted file mode 100644 index 0cd6c42..0000000 --- a/components/camel-etcd/src/main/resources/META-INF/services/org/apache/camel/model/ServiceCallDefinition +++ /dev/null @@ -1,18 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -class=org.apache.camel.component.etcd.processor.remote.EtcdServiceCallProcessorFactory http://git-wip-us.apache.org/repos/asf/camel/blob/a811f400/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/cloud/EtcdServiceCallRouteTest.java ---------------------------------------------------------------------- diff --git a/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/cloud/EtcdServiceCallRouteTest.java b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/cloud/EtcdServiceCallRouteTest.java new file mode 100644 index 0000000..766eeca --- /dev/null +++ b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/cloud/EtcdServiceCallRouteTest.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.etcd.cloud; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.fasterxml.jackson.databind.ObjectMapper; +import mousio.etcd4j.EtcdClient; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.cloud.ServiceDiscovery; +import org.apache.camel.component.etcd.EtcdConfiguration; +import org.apache.camel.component.etcd.EtcdHelper; +import org.apache.camel.component.etcd.EtcdTestSupport; +import org.junit.Test; + +public class EtcdServiceCallRouteTest extends EtcdTestSupport { + private static final ObjectMapper MAPPER = EtcdHelper.createObjectMapper(); + private static final String SERVICE_NAME = "http-service"; + private static final int SERVICE_COUNT = 5; + private static final int SERVICE_PORT_BASE = 8080; + + private EtcdClient client; + private List<Map<String, Object>> servers; + private List<String> expectedBodies; + + // ************************************************************************* + // Setup / tear down + // ************************************************************************* + + @Override + protected void doPreSetup() throws Exception { + client = getClient(); + + servers = new ArrayList<>(SERVICE_COUNT); + expectedBodies = new ArrayList<>(SERVICE_COUNT); + + for (int i = 0; i < SERVICE_COUNT; i++) { + Map<String, Object> server = new HashMap<>(); + server.put("name", SERVICE_NAME); + server.put("address", "127.0.0.1"); + server.put("port", SERVICE_PORT_BASE + i); + + client.put("/services/" + "service-" + i, MAPPER.writeValueAsString(server)).send().get(); + + servers.add(Collections.unmodifiableMap(server)); + expectedBodies.add("ping on " + (SERVICE_PORT_BASE + i)); + } + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + client.deleteDir("/services/").recursive().send().get(); + } + + // ************************************************************************* + // Test + // ************************************************************************* + + @Test + public void testServiceCall() throws Exception { + getMockEndpoint("mock:result").expectedMessageCount(SERVICE_COUNT); + getMockEndpoint("mock:result").expectedBodiesReceivedInAnyOrder(expectedBodies); + + servers.forEach(s -> template.sendBody("direct:start", "ping")); + + assertMockEndpointsSatisfied(); + } + + // ************************************************************************* + // Route + // ************************************************************************* + + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + EtcdConfiguration configuration = new EtcdConfiguration(null); + ServiceDiscovery discovery = new EtcdOnDemandServiceDiscovery(configuration); + + from("direct:start") + .serviceCall() + .name(SERVICE_NAME) + .component("http") + .serviceDiscovery(discovery) + .end() + .to("log:org.apache.camel.component.etcd.processor.service?level=INFO&showAll=true&multiline=true") + .to("mock:result"); + + servers.forEach(s -> + fromF("jetty:http://%s:%d", s.get("address"), s.get("port")) + .transform().simple("${in.body} on " + s.get("port")) + ); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/a811f400/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/cloud/EtcdServiceDiscoveryTest.java ---------------------------------------------------------------------- diff --git a/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/cloud/EtcdServiceDiscoveryTest.java b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/cloud/EtcdServiceDiscoveryTest.java new file mode 100644 index 0000000..17ec0b2 --- /dev/null +++ b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/cloud/EtcdServiceDiscoveryTest.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.etcd.cloud; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import com.fasterxml.jackson.databind.ObjectMapper; +import mousio.etcd4j.EtcdClient; +import mousio.etcd4j.responses.EtcdException; +import org.apache.camel.cloud.ServiceDefinition; +import org.apache.camel.component.etcd.EtcdConfiguration; +import org.apache.camel.component.etcd.EtcdHelper; +import org.apache.camel.component.etcd.EtcdTestSupport; +import org.junit.Test; + +public class EtcdServiceDiscoveryTest extends EtcdTestSupport { + private static final ObjectMapper MAPPER = EtcdHelper.createObjectMapper(); + private static final EtcdConfiguration CONFIGURATION = new EtcdConfiguration(null); + private static final AtomicInteger PORT = new AtomicInteger(0); + + private EtcdClient client; + + @Override + public void doPreSetup() throws Exception { + client = getClient(); + try { + client.deleteDir(CONFIGURATION.getServicePath()).recursive().send().get(); + } catch (EtcdException e) { + // Ignore + } + } + + @Override + public void tearDown() throws Exception { + try { + client.deleteDir(CONFIGURATION.getServicePath()).recursive().send().get(); + client.close(); + client = null; + } catch (EtcdException e) { + // Ignore + } + } + + @Test + public void testOnDemandDiscovery() throws Exception { + for (int i = 0; i < 3; i++) { + addServer(client, "serviceType-1"); + } + for (int i = 0; i < 2; i++) { + addServer(client, "serviceType-2"); + } + + EtcdOnDemandServiceDiscovery strategy = new EtcdOnDemandServiceDiscovery(CONFIGURATION); + strategy.start(); + + List<ServiceDefinition> type1 = strategy.getUpdatedListOfServices("serviceType-1"); + assertEquals(3, type1.size()); + for (ServiceDefinition service : type1) { + assertNotNull(service.getMetadata()); + assertTrue(service.getMetadata().containsKey("service_name")); + assertTrue(service.getMetadata().containsKey("port_delta")); + } + + List<ServiceDefinition> type2 = strategy.getUpdatedListOfServices("serviceType-2"); + assertEquals(2, type2.size()); + for (ServiceDefinition service : type2) { + assertNotNull(service.getMetadata()); + assertTrue(service.getMetadata().containsKey("service_name")); + assertTrue(service.getMetadata().containsKey("port_delta")); + } + + strategy.stop(); + } + + @Test + public void testWatchDiscovery() throws Exception { + addServer(client, "serviceType-3"); + + EtcdWatchServiceDiscovery strategy = new EtcdWatchServiceDiscovery(CONFIGURATION); + strategy.start(); + + assertEquals(1, strategy.getUpdatedListOfServices("serviceType-3").size()); + + addServer(client, "serviceType-3"); + addServer(client, "serviceType-3"); + addServer(client, "serviceType-4"); + + Thread.sleep(250); + + assertEquals(3, strategy.getUpdatedListOfServices("serviceType-3").size()); + + strategy.stop(); + } + + private void addServer(EtcdClient client, String name) throws Exception { + int port = PORT.incrementAndGet(); + + Map<String, String> tags = new HashMap<>(); + tags.put("service_name", name); + tags.put("port_delta", Integer.toString(port)); + + Map<String, Object> server = new HashMap<>(); + server.put("name", name); + server.put("address", "127.0.0.1"); + server.put("port", 8000 + port); + server.put("tags", tags); + + client.put(CONFIGURATION.getServicePath() + "service-" + port, MAPPER.writeValueAsString(server)).send().get(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/a811f400/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/cloud/SpringEtcdServiceCallDefaultRouteTest.java ---------------------------------------------------------------------- diff --git a/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/cloud/SpringEtcdServiceCallDefaultRouteTest.java b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/cloud/SpringEtcdServiceCallDefaultRouteTest.java new file mode 100644 index 0000000..1440ba2 --- /dev/null +++ b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/cloud/SpringEtcdServiceCallDefaultRouteTest.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.etcd.cloud; + +import java.net.URI; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import mousio.etcd4j.EtcdClient; +import org.apache.camel.component.etcd.EtcdConfiguration; +import org.apache.camel.component.etcd.EtcdHelper; +import org.apache.camel.test.spring.CamelSpringTestSupport; +import org.junit.Test; +import org.springframework.context.support.AbstractApplicationContext; +import org.springframework.context.support.ClassPathXmlApplicationContext; + +public class SpringEtcdServiceCallDefaultRouteTest extends CamelSpringTestSupport { + private static final ObjectMapper MAPPER = EtcdHelper.createObjectMapper(); + private static final EtcdConfiguration CONFIGURATION = new EtcdConfiguration(); + private static final EtcdClient CLIENT = new EtcdClient(URI.create("http://localhost:2379")); + + @Override + protected AbstractApplicationContext createApplicationContext() { + return new ClassPathXmlApplicationContext("org/apache/camel/component/etcd/cloud/SpringEtcdServiceCallDefaultRouteTest.xml"); + } + + // ************************************************************************* + // Setup / tear down + // ************************************************************************* + + @Override + public void doPreSetup() throws Exception { + JsonNode service1 = MAPPER.createObjectNode() + .put("name", "http-service") + .put("address", "127.0.0.1") + .put("port", "9091"); + JsonNode service2 = MAPPER.createObjectNode() + .put("name", "http-service") + .put("address", "127.0.0.1") + .put("port", "9092"); + + CLIENT.put(CONFIGURATION.getServicePath() + "service-1", MAPPER.writeValueAsString(service1)).send().get(); + CLIENT.put(CONFIGURATION.getServicePath() + "service-2", MAPPER.writeValueAsString(service2)).send().get(); + + super.doPreSetup(); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + CLIENT.deleteDir(CONFIGURATION.getServicePath()).recursive().send().get(); + } + + // ************************************************************************* + // Test + // ************************************************************************* + + @Test + public void testServiceCall() throws Exception { + getMockEndpoint("mock:result").expectedMessageCount(2); + getMockEndpoint("mock:result").expectedBodiesReceivedInAnyOrder("9091", "9092"); + + template.sendBody("direct:start", null); + template.sendBody("direct:start", null); + + assertMockEndpointsSatisfied(); + } +}