Repository: camel
Updated Branches:
  refs/heads/master ab6327dbf -> b20319938


camel-atomix: cluster service for both atomix replica and client


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b2031993
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b2031993
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b2031993

Branch: refs/heads/master
Commit: b203199384efb63e61b7138b34c83aa736c4886a
Parents: ab6327d
Author: lburgazzoli <lburgazz...@gmail.com>
Authored: Mon Jun 26 12:38:24 2017 +0200
Committer: lburgazzoli <lburgazz...@gmail.com>
Committed: Mon Jun 26 12:43:35 2017 +0200

----------------------------------------------------------------------
 .../atomix/AtomixConfigurationAware.java        |  29 ++++
 .../client/AtomixClientConfiguration.java       |  13 ++
 .../atomix/ha/AtomixClusterClientService.java   | 134 +++++++++++++++++++
 .../atomix/ha/AtomixClusterService.java         |  13 +-
 .../component/atomix/ha/AtomixClusterView.java  |   9 +-
 .../atomix/ha/AtomixClientRoutePolicyTest.java  | 122 +++++++++++++++++
 6 files changed, 312 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/b2031993/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixConfigurationAware.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixConfigurationAware.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixConfigurationAware.java
new file mode 100644
index 0000000..b13de80
--- /dev/null
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixConfigurationAware.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix;
+
+public interface AtomixConfigurationAware<C extends AtomixConfiguration> {
+    /**
+     * @return the configuration
+     */
+    C getConfiguration();
+
+    /**
+     * @param configuration the configuration
+     */
+    void setConfiguration(C configuration);
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b2031993/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientConfiguration.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientConfiguration.java
index 4791bde..616b6fe 100644
--- 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientConfiguration.java
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientConfiguration.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.atomix.client;
 
 import io.atomix.AtomixClient;
+import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.component.atomix.AtomixConfiguration;
 import org.apache.camel.spi.UriParam;
 
@@ -38,4 +39,16 @@ public class AtomixClientConfiguration extends 
AtomixConfiguration<AtomixClient>
     public void setResultHeader(String resultHeader) {
         this.resultHeader = resultHeader;
     }
+
+    // ****************************************
+    // Copy
+    // ****************************************
+
+    public AtomixClientConfiguration copy() {
+        try {
+            return (AtomixClientConfiguration) super.clone();
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeCamelException(e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/b2031993/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterClientService.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterClientService.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterClientService.java
new file mode 100644
index 0000000..00695c0
--- /dev/null
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterClientService.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.ha;
+
+import java.util.List;
+
+import io.atomix.AtomixClient;
+import io.atomix.catalyst.transport.Address;
+import io.atomix.catalyst.transport.Transport;
+import org.apache.camel.CamelContext;
+import org.apache.camel.component.atomix.AtomixConfigurationAware;
+import org.apache.camel.component.atomix.client.AtomixClientConfiguration;
+import org.apache.camel.component.atomix.client.AtomixClientHelper;
+import org.apache.camel.impl.ha.AbstractCamelClusterService;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class AtomixClusterClientService extends 
AbstractCamelClusterService<AtomixClusterView> implements 
AtomixConfigurationAware<AtomixClientConfiguration> {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AtomixClusterClientService.class);
+    private AtomixClientConfiguration configuration;
+    private AtomixClient atomix;
+
+    public AtomixClusterClientService() {
+        this.configuration = new AtomixClientConfiguration();
+    }
+
+    public AtomixClusterClientService(CamelContext camelContext, 
AtomixClientConfiguration configuration) {
+        super(null, camelContext);
+
+        this.configuration = configuration.copy();
+    }
+
+    // **********************************
+    // Properties
+    // **********************************
+
+    @Override
+    public AtomixClientConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    @Override
+    public void setConfiguration(AtomixClientConfiguration configuration) {
+        this.configuration = configuration.copy();
+    }
+
+    public List<Address> getNodes() {
+        return configuration.getNodes();
+    }
+
+    public void setNodes(List<Address> nodes) {
+        configuration.setNodes(nodes);
+    }
+
+    public void setNodes(String nodes) {
+        configuration.setNodes(nodes);
+    }
+
+    public Class<? extends Transport> getTransport() {
+        return configuration.getTransport();
+    }
+
+    public void setTransport(Class<? extends Transport> transport) {
+        configuration.setTransport(transport);
+    }
+
+    public AtomixClient getAtomix() {
+        return configuration.getAtomix();
+    }
+
+    public void setAtomix(AtomixClient atomix) {
+        configuration.setAtomix(atomix);
+    }
+
+    public String getConfigurationUri() {
+        return configuration.getConfigurationUri();
+    }
+
+    public void setConfigurationUri(String configurationUri) {
+        configuration.setConfigurationUri(configurationUri);
+    }
+
+    // *********************************************
+    // Lifecycle
+    // *********************************************
+
+    @Override
+    protected void doStart() throws Exception {
+        // instantiate a new atomix replica
+        getOrCreateClient();
+
+        super.doStart();
+    }
+
+    @Override
+    protected AtomixClusterView createView(String namespace) throws Exception {
+        return new AtomixClusterView(this, namespace, getOrCreateClient());
+    }
+
+    private AtomixClient getOrCreateClient() throws Exception {
+        if (atomix == null) {
+            // Validate parameters
+            ObjectHelper.notNull(getCamelContext(), "Camel Context");
+            ObjectHelper.notNull(configuration, "Atomix Node Configuration");
+
+            if (ObjectHelper.isEmpty(configuration.getNodes())) {
+                throw new IllegalArgumentException("Atomix nodes should not be 
empty");
+            }
+
+            atomix = AtomixClientHelper.createClient(getCamelContext(), 
configuration);
+
+            LOGGER.debug("Connect to cluster nodes: {}", 
configuration.getNodes());
+            atomix.connect(configuration.getNodes()).join();
+            LOGGER.debug("Connect to cluster done");
+        }
+
+        return this.atomix;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b2031993/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java
index 06f9432..a5a8e51 100644
--- 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java
@@ -23,12 +23,13 @@ import io.atomix.catalyst.transport.Address;
 import io.atomix.catalyst.transport.Transport;
 import io.atomix.copycat.server.storage.StorageLevel;
 import org.apache.camel.CamelContext;
+import org.apache.camel.component.atomix.AtomixConfigurationAware;
 import org.apache.camel.impl.ha.AbstractCamelClusterService;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public final class AtomixClusterService extends 
AbstractCamelClusterService<AtomixClusterView> {
+public final class AtomixClusterService extends 
AbstractCamelClusterService<AtomixClusterView>  implements 
AtomixConfigurationAware<AtomixClusterConfiguration> {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(AtomixClusterService.class);
 
     private Address address;
@@ -62,10 +63,12 @@ public final class AtomixClusterService extends 
AbstractCamelClusterService<Atom
         this.address = address;
     }
 
+    @Override
     public AtomixClusterConfiguration getConfiguration() {
         return configuration;
     }
 
+    @Override
     public void setConfiguration(AtomixClusterConfiguration configuration) {
         this.configuration = configuration.copy();
     }
@@ -78,14 +81,14 @@ public final class AtomixClusterService extends 
AbstractCamelClusterService<Atom
         configuration.setStoragePath(storagePath);
     }
 
-    public List<Address> getNodes() {
-        return configuration.getNodes();
-    }
-
     public StorageLevel getStorageLevel() {
         return configuration.getStorageLevel();
     }
 
+    public List<Address> getNodes() {
+        return configuration.getNodes();
+    }
+
     public void setNodes(List<Address> nodes) {
         configuration.setNodes(nodes);
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/b2031993/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java
index a255446..62104a7 100644
--- 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java
@@ -26,7 +26,10 @@ import io.atomix.Atomix;
 import io.atomix.group.DistributedGroup;
 import io.atomix.group.GroupMember;
 import io.atomix.group.LocalMember;
+import org.apache.camel.component.atomix.AtomixConfiguration;
+import org.apache.camel.component.atomix.AtomixConfigurationAware;
 import org.apache.camel.ha.CamelClusterMember;
+import org.apache.camel.ha.CamelClusterService;
 import org.apache.camel.impl.ha.AbstractCamelClusterView;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
@@ -39,7 +42,7 @@ final class AtomixClusterView extends 
AbstractCamelClusterView {
     private final AtomixLocalMember localMember;
     private DistributedGroup group;
 
-    AtomixClusterView(AtomixClusterService cluster, String namespace, Atomix 
atomix) {
+    AtomixClusterView(CamelClusterService cluster, String namespace, Atomix 
atomix) {
         super(cluster, namespace);
 
         this.atomix = atomix;
@@ -86,8 +89,8 @@ final class AtomixClusterView extends 
AbstractCamelClusterView {
         if (!localMember.hasJoined()) {
             LOGGER.debug("Get group {}", getNamespace());
 
-            final AtomixClusterService service = 
getClusterService().unwrap(AtomixClusterService.class);
-            final AtomixClusterConfiguration configuration = 
service.getConfiguration();
+            final AtomixConfigurationAware service = 
AtomixConfigurationAware.class.cast(getClusterService());
+            final AtomixConfiguration<?> configuration = 
service.getConfiguration();
 
             group = this.atomix.getGroup(
                 getNamespace(),

http://git-wip-us.apache.org/repos/asf/camel/blob/b2031993/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTest.java
 
b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTest.java
new file mode 100644
index 0000000..371eb58
--- /dev/null
+++ 
b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.ha;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import io.atomix.AtomixReplica;
+import io.atomix.catalyst.transport.Address;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.atomix.client.AtomixFactory;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory;
+import org.apache.camel.test.AvailablePortFinder;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class AtomixClientRoutePolicyTest {
+    private static final Address ADDRESS = new Address("127.0.0.1", 
AvailablePortFinder.getNextAvailable());
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AtomixClientRoutePolicyTest.class);
+    private static final List<String> CLIENTS = IntStream.range(0, 
3).mapToObj(Integer::toString).collect(Collectors.toList());
+    private static final List<String> RESULTS = new ArrayList<>();
+    private static final ScheduledExecutorService SCHEDULER = 
Executors.newScheduledThreadPool(CLIENTS.size() * 2);
+    private static final CountDownLatch LATCH = new 
CountDownLatch(CLIENTS.size());
+
+    // ************************************
+    // Test
+    // ************************************
+
+    @Test
+    public void test() throws Exception {
+        AtomixReplica boot = null;
+
+        try {
+            boot = AtomixFactory.replica(ADDRESS);
+
+            for (String id : CLIENTS) {
+                SCHEDULER.submit(() -> run(id));
+            }
+
+            LATCH.await(1, TimeUnit.MINUTES);
+            SCHEDULER.shutdownNow();
+
+            Assert.assertEquals(CLIENTS.size(), RESULTS.size());
+            Assert.assertTrue(RESULTS.containsAll(CLIENTS));
+        } finally {
+            if (boot != null) {
+                boot.shutdown();
+            }
+        }
+    }
+
+    // ************************************
+    // Run a Camel node
+    // ************************************
+
+    private static void run(String id) {
+        try {
+            CountDownLatch contextLatch = new CountDownLatch(1);
+
+            AtomixClusterClientService service = new 
AtomixClusterClientService();
+            service.setId("node-" + id);
+            service.setNodes(Collections.singletonList(ADDRESS));
+
+            DefaultCamelContext context = new DefaultCamelContext();
+            context.disableJMX();
+            context.setName("context-" + id);
+            context.addService(service);
+            
context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns"));
+            context.addRoutes(new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from("timer:atomix?delay=1s&period=1s&repeatCount=1")
+                        .routeId("route-" + id)
+                        .process(e -> {
+                            LOGGER.debug("Node {} done", id);
+                            RESULTS.add(id);
+                            // Shutdown the context later on to give a chance 
to
+                            // other members to catch-up
+                            SCHEDULER.schedule(contextLatch::countDown, 2 + 
ThreadLocalRandom.current().nextInt(3), TimeUnit.SECONDS);
+                        });
+                }
+            });
+
+            // Start the context after some random time so the startup order
+            // changes for each test.
+            Thread.sleep(ThreadLocalRandom.current().nextInt(500));
+            context.start();
+
+            contextLatch.await();
+            context.stop();
+
+            LATCH.countDown();
+        } catch (Exception e) {
+            LOGGER.warn("", e);
+        }
+    }
+}

Reply via email to