Repository: camel Updated Branches: refs/heads/master ab6327dbf -> b20319938
camel-atomix: cluster service for both atomix replica and client Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b2031993 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b2031993 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b2031993 Branch: refs/heads/master Commit: b203199384efb63e61b7138b34c83aa736c4886a Parents: ab6327d Author: lburgazzoli <lburgazz...@gmail.com> Authored: Mon Jun 26 12:38:24 2017 +0200 Committer: lburgazzoli <lburgazz...@gmail.com> Committed: Mon Jun 26 12:43:35 2017 +0200 ---------------------------------------------------------------------- .../atomix/AtomixConfigurationAware.java | 29 ++++ .../client/AtomixClientConfiguration.java | 13 ++ .../atomix/ha/AtomixClusterClientService.java | 134 +++++++++++++++++++ .../atomix/ha/AtomixClusterService.java | 13 +- .../component/atomix/ha/AtomixClusterView.java | 9 +- .../atomix/ha/AtomixClientRoutePolicyTest.java | 122 +++++++++++++++++ 6 files changed, 312 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/b2031993/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixConfigurationAware.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixConfigurationAware.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixConfigurationAware.java new file mode 100644 index 0000000..b13de80 --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixConfigurationAware.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix; + +public interface AtomixConfigurationAware<C extends AtomixConfiguration> { + /** + * @return the configuration + */ + C getConfiguration(); + + /** + * @param configuration the configuration + */ + void setConfiguration(C configuration); +} http://git-wip-us.apache.org/repos/asf/camel/blob/b2031993/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientConfiguration.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientConfiguration.java index 4791bde..616b6fe 100644 --- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientConfiguration.java +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientConfiguration.java @@ -17,6 +17,7 @@ package org.apache.camel.component.atomix.client; import io.atomix.AtomixClient; +import org.apache.camel.RuntimeCamelException; import org.apache.camel.component.atomix.AtomixConfiguration; import org.apache.camel.spi.UriParam; @@ -38,4 +39,16 @@ public class AtomixClientConfiguration extends AtomixConfiguration<AtomixClient> public void setResultHeader(String resultHeader) { this.resultHeader = resultHeader; } + + // **************************************** + // Copy + // **************************************** + + public AtomixClientConfiguration copy() { + try { + return (AtomixClientConfiguration) super.clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeCamelException(e); + } + } } http://git-wip-us.apache.org/repos/asf/camel/blob/b2031993/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterClientService.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterClientService.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterClientService.java new file mode 100644 index 0000000..00695c0 --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterClientService.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.ha; + +import java.util.List; + +import io.atomix.AtomixClient; +import io.atomix.catalyst.transport.Address; +import io.atomix.catalyst.transport.Transport; +import org.apache.camel.CamelContext; +import org.apache.camel.component.atomix.AtomixConfigurationAware; +import org.apache.camel.component.atomix.client.AtomixClientConfiguration; +import org.apache.camel.component.atomix.client.AtomixClientHelper; +import org.apache.camel.impl.ha.AbstractCamelClusterService; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class AtomixClusterClientService extends AbstractCamelClusterService<AtomixClusterView> implements AtomixConfigurationAware<AtomixClientConfiguration> { + private static final Logger LOGGER = LoggerFactory.getLogger(AtomixClusterClientService.class); + private AtomixClientConfiguration configuration; + private AtomixClient atomix; + + public AtomixClusterClientService() { + this.configuration = new AtomixClientConfiguration(); + } + + public AtomixClusterClientService(CamelContext camelContext, AtomixClientConfiguration configuration) { + super(null, camelContext); + + this.configuration = configuration.copy(); + } + + // ********************************** + // Properties + // ********************************** + + @Override + public AtomixClientConfiguration getConfiguration() { + return configuration; + } + + @Override + public void setConfiguration(AtomixClientConfiguration configuration) { + this.configuration = configuration.copy(); + } + + public List<Address> getNodes() { + return configuration.getNodes(); + } + + public void setNodes(List<Address> nodes) { + configuration.setNodes(nodes); + } + + public void setNodes(String nodes) { + configuration.setNodes(nodes); + } + + public Class<? extends Transport> getTransport() { + return configuration.getTransport(); + } + + public void setTransport(Class<? extends Transport> transport) { + configuration.setTransport(transport); + } + + public AtomixClient getAtomix() { + return configuration.getAtomix(); + } + + public void setAtomix(AtomixClient atomix) { + configuration.setAtomix(atomix); + } + + public String getConfigurationUri() { + return configuration.getConfigurationUri(); + } + + public void setConfigurationUri(String configurationUri) { + configuration.setConfigurationUri(configurationUri); + } + + // ********************************************* + // Lifecycle + // ********************************************* + + @Override + protected void doStart() throws Exception { + // instantiate a new atomix replica + getOrCreateClient(); + + super.doStart(); + } + + @Override + protected AtomixClusterView createView(String namespace) throws Exception { + return new AtomixClusterView(this, namespace, getOrCreateClient()); + } + + private AtomixClient getOrCreateClient() throws Exception { + if (atomix == null) { + // Validate parameters + ObjectHelper.notNull(getCamelContext(), "Camel Context"); + ObjectHelper.notNull(configuration, "Atomix Node Configuration"); + + if (ObjectHelper.isEmpty(configuration.getNodes())) { + throw new IllegalArgumentException("Atomix nodes should not be empty"); + } + + atomix = AtomixClientHelper.createClient(getCamelContext(), configuration); + + LOGGER.debug("Connect to cluster nodes: {}", configuration.getNodes()); + atomix.connect(configuration.getNodes()).join(); + LOGGER.debug("Connect to cluster done"); + } + + return this.atomix; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/b2031993/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java index 06f9432..a5a8e51 100644 --- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java @@ -23,12 +23,13 @@ import io.atomix.catalyst.transport.Address; import io.atomix.catalyst.transport.Transport; import io.atomix.copycat.server.storage.StorageLevel; import org.apache.camel.CamelContext; +import org.apache.camel.component.atomix.AtomixConfigurationAware; import org.apache.camel.impl.ha.AbstractCamelClusterService; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public final class AtomixClusterService extends AbstractCamelClusterService<AtomixClusterView> { +public final class AtomixClusterService extends AbstractCamelClusterService<AtomixClusterView> implements AtomixConfigurationAware<AtomixClusterConfiguration> { private static final Logger LOGGER = LoggerFactory.getLogger(AtomixClusterService.class); private Address address; @@ -62,10 +63,12 @@ public final class AtomixClusterService extends AbstractCamelClusterService<Atom this.address = address; } + @Override public AtomixClusterConfiguration getConfiguration() { return configuration; } + @Override public void setConfiguration(AtomixClusterConfiguration configuration) { this.configuration = configuration.copy(); } @@ -78,14 +81,14 @@ public final class AtomixClusterService extends AbstractCamelClusterService<Atom configuration.setStoragePath(storagePath); } - public List<Address> getNodes() { - return configuration.getNodes(); - } - public StorageLevel getStorageLevel() { return configuration.getStorageLevel(); } + public List<Address> getNodes() { + return configuration.getNodes(); + } + public void setNodes(List<Address> nodes) { configuration.setNodes(nodes); } http://git-wip-us.apache.org/repos/asf/camel/blob/b2031993/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java index a255446..62104a7 100644 --- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java @@ -26,7 +26,10 @@ import io.atomix.Atomix; import io.atomix.group.DistributedGroup; import io.atomix.group.GroupMember; import io.atomix.group.LocalMember; +import org.apache.camel.component.atomix.AtomixConfiguration; +import org.apache.camel.component.atomix.AtomixConfigurationAware; import org.apache.camel.ha.CamelClusterMember; +import org.apache.camel.ha.CamelClusterService; import org.apache.camel.impl.ha.AbstractCamelClusterView; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; @@ -39,7 +42,7 @@ final class AtomixClusterView extends AbstractCamelClusterView { private final AtomixLocalMember localMember; private DistributedGroup group; - AtomixClusterView(AtomixClusterService cluster, String namespace, Atomix atomix) { + AtomixClusterView(CamelClusterService cluster, String namespace, Atomix atomix) { super(cluster, namespace); this.atomix = atomix; @@ -86,8 +89,8 @@ final class AtomixClusterView extends AbstractCamelClusterView { if (!localMember.hasJoined()) { LOGGER.debug("Get group {}", getNamespace()); - final AtomixClusterService service = getClusterService().unwrap(AtomixClusterService.class); - final AtomixClusterConfiguration configuration = service.getConfiguration(); + final AtomixConfigurationAware service = AtomixConfigurationAware.class.cast(getClusterService()); + final AtomixConfiguration<?> configuration = service.getConfiguration(); group = this.atomix.getGroup( getNamespace(), http://git-wip-us.apache.org/repos/asf/camel/blob/b2031993/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTest.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTest.java b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTest.java new file mode 100644 index 0000000..371eb58 --- /dev/null +++ b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTest.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.ha; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import io.atomix.AtomixReplica; +import io.atomix.catalyst.transport.Address; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.atomix.client.AtomixFactory; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory; +import org.apache.camel.test.AvailablePortFinder; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class AtomixClientRoutePolicyTest { + private static final Address ADDRESS = new Address("127.0.0.1", AvailablePortFinder.getNextAvailable()); + private static final Logger LOGGER = LoggerFactory.getLogger(AtomixClientRoutePolicyTest.class); + private static final List<String> CLIENTS = IntStream.range(0, 3).mapToObj(Integer::toString).collect(Collectors.toList()); + private static final List<String> RESULTS = new ArrayList<>(); + private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(CLIENTS.size() * 2); + private static final CountDownLatch LATCH = new CountDownLatch(CLIENTS.size()); + + // ************************************ + // Test + // ************************************ + + @Test + public void test() throws Exception { + AtomixReplica boot = null; + + try { + boot = AtomixFactory.replica(ADDRESS); + + for (String id : CLIENTS) { + SCHEDULER.submit(() -> run(id)); + } + + LATCH.await(1, TimeUnit.MINUTES); + SCHEDULER.shutdownNow(); + + Assert.assertEquals(CLIENTS.size(), RESULTS.size()); + Assert.assertTrue(RESULTS.containsAll(CLIENTS)); + } finally { + if (boot != null) { + boot.shutdown(); + } + } + } + + // ************************************ + // Run a Camel node + // ************************************ + + private static void run(String id) { + try { + CountDownLatch contextLatch = new CountDownLatch(1); + + AtomixClusterClientService service = new AtomixClusterClientService(); + service.setId("node-" + id); + service.setNodes(Collections.singletonList(ADDRESS)); + + DefaultCamelContext context = new DefaultCamelContext(); + context.disableJMX(); + context.setName("context-" + id); + context.addService(service); + context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns")); + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("timer:atomix?delay=1s&period=1s&repeatCount=1") + .routeId("route-" + id) + .process(e -> { + LOGGER.debug("Node {} done", id); + RESULTS.add(id); + // Shutdown the context later on to give a chance to + // other members to catch-up + SCHEDULER.schedule(contextLatch::countDown, 2 + ThreadLocalRandom.current().nextInt(3), TimeUnit.SECONDS); + }); + } + }); + + // Start the context after some random time so the startup order + // changes for each test. + Thread.sleep(ThreadLocalRandom.current().nextInt(500)); + context.start(); + + contextLatch.await(); + context.stop(); + + LATCH.countDown(); + } catch (Exception e) { + LOGGER.warn("", e); + } + } +}