http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMapProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMapProducer.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMapProducer.java
new file mode 100644
index 0000000..696b6eb
--- /dev/null
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMapProducer.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.client.multimap;
+
+import java.time.Duration;
+
+import io.atomix.collections.DistributedMultiMap;
+import io.atomix.resource.ReadConsistency;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.InvokeOnHeader;
+import org.apache.camel.Message;
+import org.apache.camel.component.atomix.client.AbstractAtomixClientProducer;
+import org.apache.camel.util.ObjectHelper;
+
+import static 
org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_ACTION;
+import static 
org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_KEY;
+import static 
org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_NAME;
+import static 
org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_READ_CONSISTENCY;
+import static 
org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_TTL;
+import static 
org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_VALUE;
+
+final class AtomixMultiMapProducer extends 
AbstractAtomixClientProducer<AtomixMultiMapEndpoint, DistributedMultiMap> {
+    private final AtomixMultiMapConfiguration configuration;
+
+    protected AtomixMultiMapProducer(AtomixMultiMapEndpoint endpoint) {
+        super(endpoint);
+        this.configuration = endpoint.getConfiguration();
+    }
+
+    // *********************************
+    // Handlers
+    // *********************************
+
+    @InvokeOnHeader("PUT")
+    boolean onPut(Message message, AsyncCallback callback) throws Exception {
+        final DistributedMultiMap<Object, Object> map = getResource(message);
+        final Object key = message.getHeader(RESOURCE_KEY, 
configuration::getKey, Object.class);
+        final Object val = message.getHeader(RESOURCE_VALUE, message::getBody, 
Object.class);
+        final long ttl = message.getHeader(RESOURCE_TTL, 
configuration::getTtl, long.class);
+
+        ObjectHelper.notNull(key, RESOURCE_KEY);
+        ObjectHelper.notNull(val, RESOURCE_VALUE);
+
+        if (ttl > 0) {
+            map.put(key, val, Duration.ofMillis(ttl)).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        } else {
+            map.put(key, val).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        }
+
+        return false;
+    }
+
+    @InvokeOnHeader("GET")
+    boolean onGet(Message message, AsyncCallback callback) throws Exception {
+        final DistributedMultiMap<Object, Object> map = getResource(message);
+        final Object key = message.getHeader(RESOURCE_KEY, 
configuration::getKey, Object.class);
+        final ReadConsistency consistency = 
message.getHeader(RESOURCE_READ_CONSISTENCY,  
configuration::getReadConsistency, ReadConsistency.class);
+
+        ObjectHelper.notNull(key, RESOURCE_KEY);
+
+        if (consistency != null) {
+            map.get(key, consistency).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        } else {
+            map.get(key).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        }
+
+        return false;
+    }
+
+    @InvokeOnHeader("CLEAR")
+    boolean onClear(Message message, AsyncCallback callback) throws Exception {
+        final DistributedMultiMap<Object, Object> map = getResource(message);
+
+        map.clear().thenAccept(
+            result -> processResult(message, callback, result)
+        );
+
+        return false;
+    }
+
+    @InvokeOnHeader("SIZE")
+    boolean onSize(Message message, AsyncCallback callback) throws Exception {
+        final DistributedMultiMap<Object, Object> map = getResource(message);
+        final ReadConsistency consistency = 
message.getHeader(RESOURCE_READ_CONSISTENCY,  
configuration::getReadConsistency, ReadConsistency.class);
+        final Object key = message.getHeader(RESOURCE_KEY, message::getBody, 
Object.class);
+
+        if (consistency != null) {
+            if (key != null) {
+                map.size(key, consistency).thenAccept(
+                    result -> processResult(message, callback, result)
+                );
+            } else {
+                map.size(consistency).thenAccept(
+                    result -> processResult(message, callback, result)
+                );
+            }
+        } else {
+            if (key != null) {
+                map.size(key).thenAccept(
+                    result -> processResult(message, callback, result)
+                );
+            } else {
+                map.size().thenAccept(
+                    result -> processResult(message, callback, result)
+                );
+            }
+        }
+
+        return false;
+    }
+
+    @InvokeOnHeader("IS_EMPTY")
+    boolean onIsEmpty(Message message, AsyncCallback callback) throws 
Exception {
+        final DistributedMultiMap<Object, Object> map = getResource(message);
+        final ReadConsistency consistency = 
message.getHeader(RESOURCE_READ_CONSISTENCY,  
configuration::getReadConsistency, ReadConsistency.class);
+
+        if (consistency != null) {
+            map.isEmpty(consistency).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        } else {
+            map.isEmpty().thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        }
+
+        return false;
+    }
+
+    @InvokeOnHeader("CONTAINS_KEY")
+    boolean onContainsKey(Message message, AsyncCallback callback) throws 
Exception {
+        final DistributedMultiMap<Object, Object> map = getResource(message);
+        final ReadConsistency consistency = 
message.getHeader(RESOURCE_READ_CONSISTENCY,  
configuration::getReadConsistency, ReadConsistency.class);
+        final Object key = message.getHeader(RESOURCE_KEY, message::getBody, 
Object.class);
+
+        ObjectHelper.notNull(key, RESOURCE_KEY);
+
+        if (consistency != null) {
+            map.containsKey(key, consistency).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        } else {
+            map.containsKey(key).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        }
+
+        return false;
+    }
+
+//    @InvokeOnHeader("CONTAINS_VALUE")
+//    boolean onContainsValue(Message message, AsyncCallback callback) throws 
Exception {
+//        final DistributedMultiMap<Object, Object> map = getResource(message);
+//        final ReadConsistency consistency = 
message.getHeader(RESOURCE_READ_CONSISTENCY,  
configuration::getReadConsistency, ReadConsistency.class);
+//        final Object value = message.getHeader(RESOURCE_VALUE, 
message::getBody, Object.class);
+//
+//        ObjectHelper.notNull(value, RESOURCE_VALUE);
+//
+//        if (consistency != null) {
+//            map.containsValue(value, consistency).thenAccept(
+//                result -> processResult(message, callback, result)
+//            );
+//        } else {
+//            map.containsValue(value).thenAccept(
+//                result -> processResult(message, callback, result)
+//            );
+//        }
+//
+//        return false;
+//    }
+
+//    @InvokeOnHeader("CONTAINS_ENTRY")
+//    boolean onContainsEntry(Message message, AsyncCallback callback) throws 
Exception {
+//        final DistributedMultiMap<Object, Object> map = getResource(message);
+//        final ReadConsistency consistency = 
message.getHeader(RESOURCE_READ_CONSISTENCY,  
configuration::getReadConsistency, ReadConsistency.class);
+//        final Object key = message.getHeader(RESOURCE_KEY, message::getBody, 
Object.class);
+//        final Object value = message.getHeader(RESOURCE_VALUE, 
message::getBody, Object.class);
+//
+//        ObjectHelper.notNull(key, RESOURCE_VALUE);
+//        ObjectHelper.notNull(value, RESOURCE_KEY);
+//
+//        if (consistency != null) {
+//            map.containsEntry(key, value, consistency).thenAccept(
+//                result -> processResult(message, callback, result)
+//            );
+//        } else {
+//            map.containsEntry(key, value).thenAccept(
+//                result -> processResult(message, callback, result)
+//            );
+//        }
+//
+//        return false;
+//    }
+
+    @InvokeOnHeader("REMOVE")
+    boolean onRemove(Message message, AsyncCallback callback) throws Exception 
{
+        final DistributedMultiMap<Object, Object> map = getResource(message);
+        final Object key = message.getHeader(RESOURCE_KEY, message::getBody, 
Object.class);
+        final Object value = message.getHeader(RESOURCE_VALUE, 
message::getBody, Object.class);
+
+        ObjectHelper.notNull(key, RESOURCE_KEY);
+
+        if (value != null) {
+            map.remove(key, value).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        } else {
+            map.remove(key).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        }
+
+        return false;
+    }
+
+    @InvokeOnHeader("REMOVE_VALUE")
+    boolean onRemoveValue(Message message, AsyncCallback callback) throws 
Exception {
+        final DistributedMultiMap<Object, Object> map = getResource(message);
+        final Object value = message.getHeader(RESOURCE_VALUE, 
message::getBody, Object.class);
+
+        ObjectHelper.notNull(value, RESOURCE_VALUE);
+
+        map.removeValue(value).thenAccept(
+            result -> processResult(message, callback, result)
+        );
+
+        return false;
+    }
+
+    // *********************************
+    // Implementation
+    // *********************************
+
+
+    @Override
+    protected String getProcessorKey(Message message) {
+        return message.getHeader(RESOURCE_ACTION, 
configuration::getDefaultAction, String.class);
+    }
+
+    @Override
+    protected String getResourceName(Message message) {
+        return message.getHeader(RESOURCE_NAME, 
getAtomixEndpoint()::getResourceName, String.class);
+    }
+
+    @Override
+    protected DistributedMultiMap<Object, Object> createResource(String 
resourceName) {
+        return getAtomixEndpoint()
+            .getAtomix()
+            .getMultiMap(
+                resourceName,
+                new 
DistributedMultiMap.Config(getAtomixEndpoint().getConfiguration().getResourceOptions(resourceName)),
+                new 
DistributedMultiMap.Options(getAtomixEndpoint().getConfiguration().getResourceConfig(resourceName)))
+            .join();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueue.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueue.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueue.java
new file mode 100644
index 0000000..edc8cb5
--- /dev/null
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueue.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.client.queue;
+
+public final class AtomixQueue {
+
+    public enum Action {
+        ADD,
+        OFFER,
+        PEEK,
+        POLL,
+        CLEAR,
+        CONTAINS,
+        IS_EMPTY,
+        REMOVE,
+        SIZE
+    }
+
+    private AtomixQueue() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueComponent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueComponent.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueComponent.java
new file mode 100644
index 0000000..083a851
--- /dev/null
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueComponent.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.client.queue;
+
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.component.atomix.client.AbstractAtomixClientComponent;
+
+public final class AtomixQueueComponent extends 
AbstractAtomixClientComponent<AtomixQueueConfiguration> {
+    private AtomixQueueConfiguration configuration = new 
AtomixQueueConfiguration();
+
+    public AtomixQueueComponent() {
+        super();
+    }
+
+    public AtomixQueueComponent(CamelContext camelContext) {
+        super(camelContext);
+    }
+
+    @Override
+    protected Endpoint createEndpoint(String uri, String remaining, 
Map<String, Object> parameters) throws Exception {
+        AtomixQueueConfiguration configuration = this.configuration.copy();
+
+        // Bind options to the configuration object
+        setConfigurationProperties(configuration, parameters);
+
+        AtomixQueueEndpoint endpoint = new AtomixQueueEndpoint(uri, this, 
remaining);
+        endpoint.setConfiguration(configuration);
+
+        setProperties(endpoint, parameters);
+
+        return endpoint;
+    }
+
+    // **********************************************
+    // Properties
+    // **********************************************
+
+    public AtomixQueueConfiguration getConfiguration() {
+        return this.configuration;
+    }
+
+    /**
+     * The shared component configuration
+     */
+    public void setConfiguration(AtomixQueueConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
+    @Override
+    protected AtomixQueueConfiguration getComponentConfiguration() {
+        return getConfiguration();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueConfiguration.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueConfiguration.java
new file mode 100644
index 0000000..0175e69
--- /dev/null
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueConfiguration.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.client.queue;
+
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.atomix.client.AtomixClientConfiguration;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriParams;
+
+@UriParams
+public class AtomixQueueConfiguration extends AtomixClientConfiguration {
+    @UriParam(defaultValue = "ADD")
+    private AtomixQueue.Action defaultAction = AtomixQueue.Action.ADD;
+
+    // ****************************************
+    // Properties
+    // ****************************************
+
+    public AtomixQueue.Action getDefaultAction() {
+        return defaultAction;
+    }
+
+    /**
+     * The default action.
+     */
+    public void setDefaultAction(AtomixQueue.Action defaultAction) {
+        this.defaultAction = defaultAction;
+    }
+
+    // ****************************************
+    // Copy
+    // ****************************************
+
+    public AtomixQueueConfiguration copy() {
+        try {
+            return (AtomixQueueConfiguration) super.clone();
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeCamelException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueConsumer.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueConsumer.java
new file mode 100644
index 0000000..eef6eec
--- /dev/null
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueConsumer.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.client.queue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import io.atomix.catalyst.concurrent.Listener;
+import io.atomix.collections.DistributedQueue;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.atomix.client.AbstractAtomixClientConsumer;
+import org.apache.camel.component.atomix.client.AtomixClientConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class AtomixQueueConsumer extends 
AbstractAtomixClientConsumer<AtomixQueueEndpoint> {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AtomixQueueConsumer.class);
+
+    private final List<Listener<DistributedQueue.ValueEvent<Object>>> 
listeners;
+    private final String resourceName;
+    private final String resultHeader;
+    private DistributedQueue<Object> queue;
+
+    public AtomixQueueConsumer(AtomixQueueEndpoint endpoint, Processor 
processor, String resourceName) {
+        super(endpoint, processor);
+        this.listeners = new ArrayList<>();
+        this.resourceName = resourceName;
+        this.resultHeader = endpoint.getConfiguration().getResultHeader();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        this.queue = getAtomixEndpoint()
+            .getAtomix()
+            .getQueue(
+                resourceName,
+                new 
DistributedQueue.Config(getAtomixEndpoint().getConfiguration().getResourceOptions(resourceName)),
+                new 
DistributedQueue.Options(getAtomixEndpoint().getConfiguration().getResourceConfig(resourceName)))
+            .join();
+
+
+        LOGGER.debug("Subscribe to events for queue: {}", resourceName);
+        this.listeners.add(this.queue.onAdd(this::onEvent).join());
+        this.listeners.add(this.queue.onRemove(this::onEvent).join());
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        // close listeners
+        listeners.forEach(Listener::close);
+
+        super.doStart();
+    }
+
+    // ********************************************
+    // Event handler
+    // ********************************************
+
+    private void onEvent(DistributedQueue.ValueEvent<Object> event) {
+        Exchange exchange = getEndpoint().createExchange();
+        exchange.getIn().setHeader(AtomixClientConstants.EVENT_TYPE, 
event.type());
+
+        if (resultHeader == null) {
+            exchange.getIn().setBody(event.value());
+        } else {
+            exchange.getIn().setHeader(resultHeader, event.value());
+        }
+
+        try {
+            getProcessor().process(exchange);
+        } catch (Exception e) {
+            getExceptionHandler().handleException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueEndpoint.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueEndpoint.java
new file mode 100644
index 0000000..4a16c1a
--- /dev/null
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueEndpoint.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.client.queue;
+
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.atomix.client.AbstractAtomixClientEndpoint;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+
+@UriEndpoint(
+    firstVersion = "2.20.0",
+    scheme = "atomix-queue",
+    title = "Atomix Queue",
+    syntax = "atomix-queue:queueName",
+    consumerClass = AtomixQueueConsumer.class,
+    label = "clustering")
+final class AtomixQueueEndpoint extends 
AbstractAtomixClientEndpoint<AtomixQueueComponent, AtomixQueueConfiguration> {
+    @UriParam
+    private AtomixQueueConfiguration configuration;
+
+    public AtomixQueueEndpoint(String uri, AtomixQueueComponent component, 
String resourceName) {
+        super(uri, component, resourceName);
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        return new AtomixQueueProducer(this);
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        return new AtomixQueueConsumer(this, processor, getResourceName());
+    }
+
+    @Override
+    public AtomixQueueConfiguration getConfiguration() {
+        return this.configuration;
+    }
+
+    @Override
+    public void setConfiguration(AtomixQueueConfiguration configuration) {
+        this.configuration = configuration;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueProducer.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueProducer.java
new file mode 100644
index 0000000..c207e34
--- /dev/null
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueProducer.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.client.queue;
+
+import io.atomix.collections.DistributedQueue;
+import io.atomix.resource.ReadConsistency;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.InvokeOnHeader;
+import org.apache.camel.Message;
+import org.apache.camel.component.atomix.client.AbstractAtomixClientProducer;
+import org.apache.camel.util.ObjectHelper;
+
+import static 
org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_ACTION;
+import static 
org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_NAME;
+import static 
org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_READ_CONSISTENCY;
+import static 
org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_VALUE;
+
+final class AtomixQueueProducer extends 
AbstractAtomixClientProducer<AtomixQueueEndpoint, DistributedQueue> {
+    private final AtomixQueueConfiguration configuration;
+
+    protected AtomixQueueProducer(AtomixQueueEndpoint endpoint) {
+        super(endpoint);
+        this.configuration = endpoint.getConfiguration();
+    }
+
+    // *********************************
+    // Handlers
+    // *********************************
+
+    @InvokeOnHeader("ADD")
+    boolean onAdd(Message message, AsyncCallback callback) throws Exception {
+        final DistributedQueue<Object> queue = getResource(message);
+        final Object val = message.getHeader(RESOURCE_VALUE, message::getBody, 
Object.class);
+
+        ObjectHelper.notNull(val, RESOURCE_VALUE);
+
+        queue.add(val).thenAccept(
+            result -> processResult(message, callback, result)
+        );
+
+        return false;
+    }
+
+    @InvokeOnHeader("OFFER")
+    boolean onOffer(Message message, AsyncCallback callback) throws Exception {
+        final DistributedQueue<Object> queue = getResource(message);
+        final Object val = message.getHeader(RESOURCE_VALUE, message::getBody, 
Object.class);
+
+        ObjectHelper.notNull(val, RESOURCE_VALUE);
+
+        queue.offer(val).thenAccept(
+            result -> processResult(message, callback, result)
+        );
+
+        return false;
+    }
+
+    @InvokeOnHeader("PEEK")
+    boolean onPeek(Message message, AsyncCallback callback) throws Exception {
+        final DistributedQueue<Object> queue = getResource(message);
+
+        queue.peek().thenAccept(
+            result -> processResult(message, callback, result)
+        );
+
+        return false;
+    }
+
+    @InvokeOnHeader("POLL")
+    boolean onPoll(Message message, AsyncCallback callback) throws Exception {
+        final DistributedQueue<Object> queue = getResource(message);
+
+        queue.poll().thenAccept(
+            result -> processResult(message, callback, result)
+        );
+
+        return false;
+    }
+
+    @InvokeOnHeader("CLEAR")
+    boolean onClear(Message message, AsyncCallback callback) throws Exception {
+        final DistributedQueue<Object> queue = getResource(message);
+
+        queue.clear().thenAccept(
+            result -> processResult(message, callback, result)
+        );
+
+        return false;
+    }
+
+    @InvokeOnHeader("CONTAINS")
+    boolean onContains(Message message, AsyncCallback callback) throws 
Exception {
+        final DistributedQueue<Object> queue = getResource(message);
+        final ReadConsistency consistency = 
message.getHeader(RESOURCE_READ_CONSISTENCY,  
configuration::getReadConsistency, ReadConsistency.class);
+        final Object value = message.getHeader(RESOURCE_VALUE, 
message::getBody, Object.class);
+
+        ObjectHelper.notNull(value, RESOURCE_VALUE);
+
+        if (consistency != null) {
+            queue.contains(value, consistency).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        } else {
+            queue.contains(value).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        }
+
+        return false;
+    }
+
+    @InvokeOnHeader("IS_EMPTY")
+    boolean onIsEmpty(Message message, AsyncCallback callback) throws 
Exception {
+        final DistributedQueue<Object> queue = getResource(message);
+        final ReadConsistency consistency = 
message.getHeader(RESOURCE_READ_CONSISTENCY,  
configuration::getReadConsistency, ReadConsistency.class);
+
+        if (consistency != null) {
+            queue.isEmpty(consistency).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        } else {
+            queue.isEmpty().thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        }
+
+        return false;
+    }
+
+    @InvokeOnHeader("REMOVE")
+    boolean onRemove(Message message, AsyncCallback callback) throws Exception 
{
+        final DistributedQueue<Object> queue = getResource(message);
+        final Object value = message.getHeader(RESOURCE_VALUE, 
message::getBody, Object.class);
+
+        if (value == null) {
+            queue.remove().thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        } else {
+            queue.remove(value).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        }
+
+        return false;
+    }
+
+    @InvokeOnHeader("SIZE")
+    boolean onSize(Message message, AsyncCallback callback) throws Exception {
+        final DistributedQueue<Object> queue = getResource(message);
+        final ReadConsistency consistency = 
message.getHeader(RESOURCE_READ_CONSISTENCY,  
configuration::getReadConsistency, ReadConsistency.class);
+
+        if (consistency != null) {
+            queue.size(consistency).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        } else {
+            queue.size().thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        }
+
+        return false;
+    }
+
+    // *********************************
+    // Implementation
+    // *********************************
+
+    @Override
+    protected String getProcessorKey(Message message) {
+        return message.getHeader(RESOURCE_ACTION, 
configuration::getDefaultAction, String.class);
+    }
+
+    @Override
+    protected String getResourceName(Message message) {
+        return message.getHeader(RESOURCE_NAME, 
getAtomixEndpoint()::getResourceName, String.class);
+    }
+
+    @Override
+    protected DistributedQueue<Object> createResource(String resourceName) {
+        return getAtomixEndpoint()
+            .getAtomix()
+            .getQueue(
+                resourceName,
+                new 
DistributedQueue.Config(getAtomixEndpoint().getConfiguration().getResourceOptions(resourceName)),
+                new 
DistributedQueue.Options(getAtomixEndpoint().getConfiguration().getResourceConfig(resourceName)))
+            .join();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSet.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSet.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSet.java
new file mode 100644
index 0000000..ba5b24c
--- /dev/null
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSet.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.client.set;
+
+public final class AtomixSet {
+
+    public enum Action {
+        ADD,
+        CLEAR,
+        CONTAINS,
+        IS_EMPTY,
+        REMOVE,
+        SIZE
+    }
+
+    private AtomixSet() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetComponent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetComponent.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetComponent.java
new file mode 100644
index 0000000..d91de4b
--- /dev/null
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetComponent.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.client.set;
+
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.component.atomix.client.AbstractAtomixClientComponent;
+
+public final class AtomixSetComponent extends 
AbstractAtomixClientComponent<AtomixSetConfiguration> {
+    private AtomixSetConfiguration configuration = new 
AtomixSetConfiguration();
+
+    public AtomixSetComponent() {
+        super();
+    }
+
+    public AtomixSetComponent(CamelContext camelContext) {
+        super(camelContext);
+    }
+
+    @Override
+    protected Endpoint createEndpoint(String uri, String remaining, 
Map<String, Object> parameters) throws Exception {
+        AtomixSetConfiguration configuration = this.configuration.copy();
+
+        // Bind options to the configuration object
+        setConfigurationProperties(configuration, parameters);
+
+        AtomixSetEndpoint endpoint = new AtomixSetEndpoint(uri, this, 
remaining);
+        endpoint.setConfiguration(configuration);
+
+        setProperties(endpoint, parameters);
+
+        return endpoint;
+    }
+
+    // **********************************************
+    // Properties
+    // **********************************************
+
+    public AtomixSetConfiguration getConfiguration() {
+        return this.configuration;
+    }
+
+    /**
+     * The shared component configuration
+     */
+    public void setConfiguration(AtomixSetConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
+    @Override
+    protected AtomixSetConfiguration getComponentConfiguration() {
+        return getConfiguration();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetConfiguration.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetConfiguration.java
new file mode 100644
index 0000000..e7a67c9
--- /dev/null
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetConfiguration.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.client.set;
+
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.atomix.client.AtomixClientConfiguration;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriParams;
+
+@UriParams
+public class AtomixSetConfiguration extends AtomixClientConfiguration {
+    @UriParam(defaultValue = "ADD")
+    private AtomixSet.Action defaultAction = AtomixSet.Action.ADD;
+    @UriParam
+    private long ttl;
+
+    // ****************************************
+    // Properties
+    // ****************************************
+
+    public AtomixSet.Action getDefaultAction() {
+        return defaultAction;
+    }
+
+    /**
+     * The default action.
+     */
+    public void setDefaultAction(AtomixSet.Action defaultAction) {
+        this.defaultAction = defaultAction;
+    }
+
+    public long getTtl() {
+        return ttl;
+    }
+
+    /**
+     * The resource ttl.
+     */
+    public void setTtl(long ttl) {
+        this.ttl = ttl;
+    }
+
+    // ****************************************
+    // Copy
+    // ****************************************
+
+    public AtomixSetConfiguration copy() {
+        try {
+            return (AtomixSetConfiguration) super.clone();
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeCamelException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetConsumer.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetConsumer.java
new file mode 100644
index 0000000..e20a719
--- /dev/null
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetConsumer.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.client.set;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import io.atomix.catalyst.concurrent.Listener;
+import io.atomix.collections.DistributedSet;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.atomix.client.AbstractAtomixClientConsumer;
+import org.apache.camel.component.atomix.client.AtomixClientConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class AtomixSetConsumer extends 
AbstractAtomixClientConsumer<AtomixSetEndpoint> {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AtomixSetConsumer.class);
+
+    private final List<Listener<DistributedSet.ValueEvent<Object>>> listeners;
+    private final String resourceName;
+    private final String resultHeader;
+    private DistributedSet<Object> set;
+
+    public AtomixSetConsumer(AtomixSetEndpoint endpoint, Processor processor, 
String resourceName) {
+        super(endpoint, processor);
+        this.listeners = new ArrayList<>();
+        this.resourceName = resourceName;
+        this.resultHeader = endpoint.getConfiguration().getResultHeader();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        this.set = getAtomixEndpoint()
+            .getAtomix()
+            .getSet(
+                resourceName,
+                new 
DistributedSet.Config(getAtomixEndpoint().getConfiguration().getResourceOptions(resourceName)),
+                new 
DistributedSet.Options(getAtomixEndpoint().getConfiguration().getResourceConfig(resourceName)))
+            .join();
+
+
+        LOGGER.debug("Subscribe to events for set: {}", resourceName);
+        this.listeners.add(this.set.onAdd(this::onEvent).join());
+        this.listeners.add(this.set.onRemove(this::onEvent).join());
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        // close listeners
+        listeners.forEach(Listener::close);
+
+        super.doStart();
+    }
+
+    // ********************************************
+    // Event handler
+    // ********************************************
+
+    private void onEvent(DistributedSet.ValueEvent<Object> event) {
+        Exchange exchange = getEndpoint().createExchange();
+        exchange.getIn().setHeader(AtomixClientConstants.EVENT_TYPE, 
event.type());
+
+        if (resultHeader == null) {
+            exchange.getIn().setBody(event.value());
+        } else {
+            exchange.getIn().setHeader(resultHeader, event.value());
+        }
+
+        try {
+            getProcessor().process(exchange);
+        } catch (Exception e) {
+            getExceptionHandler().handleException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetEndpoint.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetEndpoint.java
new file mode 100644
index 0000000..9a1c64c
--- /dev/null
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetEndpoint.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.client.set;
+
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.atomix.client.AbstractAtomixClientEndpoint;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+
+@UriEndpoint(
+    firstVersion = "2.20.0",
+    scheme = "atomix-set",
+    title = "Atomix Set",
+    syntax = "atomix-set:setName",
+    consumerClass = AtomixSetConsumer.class,
+    label = "clustering")
+final class AtomixSetEndpoint extends 
AbstractAtomixClientEndpoint<AtomixSetComponent, AtomixSetConfiguration> {
+    @UriParam
+    private AtomixSetConfiguration configuration;
+
+    public AtomixSetEndpoint(String uri, AtomixSetComponent component, String 
resourceName) {
+        super(uri, component, resourceName);
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        return new AtomixSetProducer(this);
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        return new AtomixSetConsumer(this, processor, getResourceName());
+    }
+
+    @Override
+    public AtomixSetConfiguration getConfiguration() {
+        return this.configuration;
+    }
+
+    @Override
+    public void setConfiguration(AtomixSetConfiguration configuration) {
+        this.configuration = configuration;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetProducer.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetProducer.java
new file mode 100644
index 0000000..87a4406
--- /dev/null
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetProducer.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.client.set;
+
+import java.time.Duration;
+
+import io.atomix.collections.DistributedSet;
+import io.atomix.resource.ReadConsistency;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.InvokeOnHeader;
+import org.apache.camel.Message;
+import org.apache.camel.component.atomix.client.AbstractAtomixClientProducer;
+import org.apache.camel.util.ObjectHelper;
+
+import static 
org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_ACTION;
+import static 
org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_NAME;
+import static 
org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_READ_CONSISTENCY;
+import static 
org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_TTL;
+import static 
org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_VALUE;
+
+final class AtomixSetProducer extends 
AbstractAtomixClientProducer<AtomixSetEndpoint, DistributedSet> {
+    private final AtomixSetConfiguration configuration;
+
+    protected AtomixSetProducer(AtomixSetEndpoint endpoint) {
+        super(endpoint);
+        this.configuration = endpoint.getConfiguration();
+    }
+
+    // *********************************
+    // Handlers
+    // *********************************
+
+    @InvokeOnHeader("ADD")
+    boolean onAdd(Message message, AsyncCallback callback) throws Exception {
+        final DistributedSet<Object> set = getResource(message);
+        final long ttl = message.getHeader(RESOURCE_TTL, 
configuration::getTtl, long.class);
+        final Object val = message.getHeader(RESOURCE_VALUE, message::getBody, 
Object.class);
+
+        ObjectHelper.notNull(val, RESOURCE_VALUE);
+
+        if (ttl > 0) {
+            set.add(val, Duration.ofMillis(ttl)).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        } else {
+            set.add(val).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        }
+
+        return false;
+    }
+
+    @InvokeOnHeader("CLEAR")
+    boolean onClear(Message message, AsyncCallback callback) throws Exception {
+        final DistributedSet<Object> set = getResource(message);
+
+        set.clear().thenAccept(
+            result -> processResult(message, callback, result)
+        );
+
+        return false;
+    }
+
+    @InvokeOnHeader("CONTAINS")
+    boolean onContains(Message message, AsyncCallback callback) throws 
Exception {
+        final DistributedSet<Object> set = getResource(message);
+        final ReadConsistency consistency = 
message.getHeader(RESOURCE_READ_CONSISTENCY,  
configuration::getReadConsistency, ReadConsistency.class);
+        final Object value = message.getHeader(RESOURCE_VALUE, 
message::getBody, Object.class);
+
+        ObjectHelper.notNull(value, RESOURCE_VALUE);
+
+        if (consistency != null) {
+            set.contains(value, consistency).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        } else {
+            set.contains(value).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        }
+
+        return false;
+    }
+
+    @InvokeOnHeader("IS_EMPTY")
+    boolean onIsEmpty(Message message, AsyncCallback callback) throws 
Exception {
+        final DistributedSet<Object> set = getResource(message);
+        final ReadConsistency consistency = 
message.getHeader(RESOURCE_READ_CONSISTENCY,  
configuration::getReadConsistency, ReadConsistency.class);
+
+        if (consistency != null) {
+            set.isEmpty(consistency).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        } else {
+            set.isEmpty().thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        }
+
+        return false;
+    }
+
+    @InvokeOnHeader("REMOVE")
+    boolean onRemove(Message message, AsyncCallback callback) throws Exception 
{
+        final DistributedSet<Object> set = getResource(message);
+        final Object value = message.getHeader(RESOURCE_VALUE, 
message::getBody, Object.class);
+
+        ObjectHelper.notNull(value, RESOURCE_VALUE);
+
+        set.remove(value).thenAccept(
+            result -> processResult(message, callback, result)
+        );
+
+        return false;
+    }
+
+    @InvokeOnHeader("SIZE")
+    boolean onSize(Message message, AsyncCallback callback) throws Exception {
+        final DistributedSet<Object> set = getResource(message);
+        final ReadConsistency consistency = 
message.getHeader(RESOURCE_READ_CONSISTENCY,  
configuration::getReadConsistency, ReadConsistency.class);
+
+        if (consistency != null) {
+            set.size(consistency).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        } else {
+            set.size().thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        }
+
+        return false;
+    }
+
+    // *********************************
+    // Implementation
+    // *********************************
+
+    @Override
+    protected String getProcessorKey(Message message) {
+        return message.getHeader(RESOURCE_ACTION, 
configuration::getDefaultAction, String.class);
+    }
+
+    @Override
+    protected String getResourceName(Message message) {
+        return message.getHeader(RESOURCE_NAME, 
getAtomixEndpoint()::getResourceName, String.class);
+    }
+
+    @Override
+    protected DistributedSet<Object> createResource(String resourceName) {
+        return getAtomixEndpoint()
+            .getAtomix()
+            .getSet(
+                resourceName,
+                new 
DistributedSet.Config(getAtomixEndpoint().getConfiguration().getResourceOptions(resourceName)),
+                new 
DistributedSet.Options(getAtomixEndpoint().getConfiguration().getResourceConfig(resourceName)))
+            .join();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValue.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValue.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValue.java
new file mode 100644
index 0000000..0f082f6
--- /dev/null
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValue.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.client.value;
+
+public final class AtomixValue {
+
+    public enum Action {
+        SET,
+        GET,
+        GET_AND_SET,
+        COMPARE_AND_SET
+    }
+
+    private AtomixValue() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueComponent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueComponent.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueComponent.java
new file mode 100644
index 0000000..d6d7ffb
--- /dev/null
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueComponent.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.client.value;
+
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.component.atomix.client.AbstractAtomixClientComponent;
+
+public final class AtomixValueComponent extends 
AbstractAtomixClientComponent<AtomixValueConfiguration> {
+    private AtomixValueConfiguration configuration = new 
AtomixValueConfiguration();
+
+    public AtomixValueComponent() {
+        super();
+    }
+
+    public AtomixValueComponent(CamelContext camelContext) {
+        super(camelContext);
+    }
+
+    @Override
+    protected Endpoint createEndpoint(String uri, String remaining, 
Map<String, Object> parameters) throws Exception {
+        AtomixValueConfiguration configuration = this.configuration.copy();
+
+        // Bind options to the configuration object
+        setConfigurationProperties(configuration, parameters);
+
+        AtomixValueEndpoint endpoint = new AtomixValueEndpoint(uri, this, 
remaining);
+        endpoint.setConfiguration(configuration);
+
+        setProperties(endpoint, parameters);
+
+        return endpoint;
+    }
+
+    // **********************************************
+    // Properties
+    // **********************************************
+
+    public AtomixValueConfiguration getConfiguration() {
+        return this.configuration;
+    }
+
+    /**
+     * The shared component configuration
+     */
+    public void setConfiguration(AtomixValueConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
+    @Override
+    protected AtomixValueConfiguration getComponentConfiguration() {
+        return getConfiguration();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueConfiguration.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueConfiguration.java
new file mode 100644
index 0000000..79f8aa7
--- /dev/null
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueConfiguration.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.client.value;
+
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.atomix.client.AtomixClientConfiguration;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriParams;
+
+@UriParams
+public class AtomixValueConfiguration extends AtomixClientConfiguration {
+    @UriParam(defaultValue = "SET")
+    private AtomixValue.Action defaultAction = AtomixValue.Action.SET;
+    @UriParam
+    private long ttl;
+
+    // ****************************************
+    // Properties
+    // ****************************************
+
+    public AtomixValue.Action getDefaultAction() {
+        return defaultAction;
+    }
+
+    /**
+     * The default action.
+     */
+    public void setDefaultAction(AtomixValue.Action defaultAction) {
+        this.defaultAction = defaultAction;
+    }
+
+    public long getTtl() {
+        return ttl;
+    }
+
+    /**
+     * The resource ttl.
+     */
+    public void setTtl(long ttl) {
+        this.ttl = ttl;
+    }
+
+    // ****************************************
+    // Copy
+    // ****************************************
+
+    public AtomixValueConfiguration copy() {
+        try {
+            return (AtomixValueConfiguration) super.clone();
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeCamelException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueConsumer.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueConsumer.java
new file mode 100644
index 0000000..ecb759e
--- /dev/null
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueConsumer.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.client.value;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import io.atomix.catalyst.concurrent.Listener;
+import io.atomix.variables.DistributedValue;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.atomix.client.AbstractAtomixClientConsumer;
+import org.apache.camel.component.atomix.client.AtomixClientConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class AtomixValueConsumer extends 
AbstractAtomixClientConsumer<AtomixValueEndpoint> {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AtomixValueConsumer.class);
+
+    private final List<Listener<DistributedValue.ChangeEvent<Object>>> 
listeners;
+    private final String resourceName;
+    private final String resultHeader;
+    private DistributedValue<Object> value;
+
+    public AtomixValueConsumer(AtomixValueEndpoint endpoint, Processor 
processor, String resourceName) {
+        super(endpoint, processor);
+        this.listeners = new ArrayList<>();
+        this.resourceName = resourceName;
+        this.resultHeader = endpoint.getConfiguration().getResultHeader();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        this.value = getAtomixEndpoint()
+            .getAtomix()
+            .getValue(
+                resourceName,
+                new 
DistributedValue.Config(getAtomixEndpoint().getConfiguration().getResourceOptions(resourceName)),
+                new 
DistributedValue.Options(getAtomixEndpoint().getConfiguration().getResourceConfig(resourceName)))
+            .join();
+
+
+        LOGGER.debug("Subscribe to events for queue: {}", resourceName);
+        this.listeners.add(this.value.onChange(this::onEvent).join());
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        // close listeners
+        listeners.forEach(Listener::close);
+
+        super.doStart();
+    }
+
+    // ********************************************
+    // Event handler
+    // ********************************************
+
+    private void onEvent(DistributedValue.ChangeEvent<Object> event) {
+        Exchange exchange = getEndpoint().createExchange();
+        exchange.getIn().setHeader(AtomixClientConstants.EVENT_TYPE, 
event.type());
+        exchange.getIn().setHeader(AtomixClientConstants.RESOURCE_OLD_VALUE, 
event.oldValue());
+
+        if (resultHeader == null) {
+            exchange.getIn().setBody(event.newValue());
+        } else {
+            exchange.getIn().setHeader(resultHeader, event.newValue());
+        }
+
+        try {
+            getProcessor().process(exchange);
+        } catch (Exception e) {
+            getExceptionHandler().handleException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueEndpoint.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueEndpoint.java
new file mode 100644
index 0000000..e4478e0
--- /dev/null
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueEndpoint.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.client.value;
+
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.atomix.client.AbstractAtomixClientEndpoint;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+
+@UriEndpoint(
+    firstVersion = "2.20.0",
+    scheme = "atomix-value",
+    title = "Atomix Value",
+    syntax = "atomix-value:valueName",
+    consumerClass = AtomixValueConsumer.class,
+    label = "clustering")
+final class AtomixValueEndpoint extends 
AbstractAtomixClientEndpoint<AtomixValueComponent, AtomixValueConfiguration> {
+    @UriParam
+    private AtomixValueConfiguration configuration;
+
+    public AtomixValueEndpoint(String uri, AtomixValueComponent component, 
String resourceName) {
+        super(uri, component, resourceName);
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        return new AtomixValueProducer(this);
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        return new AtomixValueConsumer(this, processor, getResourceName());
+    }
+
+    @Override
+    public AtomixValueConfiguration getConfiguration() {
+        return this.configuration;
+    }
+
+    @Override
+    public void setConfiguration(AtomixValueConfiguration configuration) {
+        this.configuration = configuration;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueProducer.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueProducer.java
new file mode 100644
index 0000000..c6ca5c4
--- /dev/null
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueProducer.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.client.value;
+
+import java.time.Duration;
+
+import io.atomix.resource.ReadConsistency;
+import io.atomix.variables.DistributedValue;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.InvokeOnHeader;
+import org.apache.camel.Message;
+import org.apache.camel.component.atomix.client.AbstractAtomixClientProducer;
+import org.apache.camel.util.ObjectHelper;
+
+import static 
org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_ACTION;
+import static 
org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_NAME;
+import static 
org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_OLD_VALUE;
+import static 
org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_READ_CONSISTENCY;
+import static 
org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_TTL;
+import static 
org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_VALUE;
+
+final class AtomixValueProducer extends 
AbstractAtomixClientProducer<AtomixValueEndpoint, DistributedValue> {
+    private final AtomixValueConfiguration configuration;
+
+    protected AtomixValueProducer(AtomixValueEndpoint endpoint) {
+        super(endpoint);
+        this.configuration = endpoint.getConfiguration();
+    }
+
+    // *********************************
+    // Handlers
+    // *********************************
+
+    @InvokeOnHeader("SET")
+    boolean onSet(Message message, AsyncCallback callback) throws Exception {
+        final DistributedValue<Object> value = getResource(message);
+        final long ttl = message.getHeader(RESOURCE_TTL, 
configuration::getTtl, long.class);
+        final Object val = message.getHeader(RESOURCE_VALUE, message::getBody, 
Object.class);
+
+        ObjectHelper.notNull(val, RESOURCE_VALUE);
+
+        if (ttl > 0) {
+            value.set(val, Duration.ofMillis(ttl)).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        } else {
+            value.set(val).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        }
+
+        return false;
+    }
+
+    @InvokeOnHeader("GET")
+    boolean onGet(Message message, AsyncCallback callback) throws Exception {
+        final DistributedValue<Object> value = getResource(message);
+        final ReadConsistency consistency = 
message.getHeader(RESOURCE_READ_CONSISTENCY,  
configuration::getReadConsistency, ReadConsistency.class);
+
+        if (consistency != null) {
+            value.get(consistency).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        } else {
+            value.get().thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        }
+
+        return false;
+    }
+
+    @InvokeOnHeader("GET_AND_SET")
+    boolean onGetAndSet(Message message, AsyncCallback callback) throws 
Exception {
+        final DistributedValue<Object> value = getResource(message);
+        final long ttl = message.getHeader(RESOURCE_TTL, 
configuration::getTtl, long.class);
+        final Object val = message.getHeader(RESOURCE_VALUE, message::getBody, 
Object.class);
+
+        ObjectHelper.notNull(val, RESOURCE_VALUE);
+
+        if (ttl > 0) {
+            value.getAndSet(val, Duration.ofMillis(ttl)).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        } else {
+            value.getAndSet(val).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        }
+
+        return false;
+    }
+
+    @InvokeOnHeader("COMPARE_AND_SET")
+    boolean onCompareAndSet(Message message, AsyncCallback callback) throws 
Exception {
+        final DistributedValue<Object> value = getResource(message);
+        final long ttl = message.getHeader(RESOURCE_TTL, 
configuration::getTtl, long.class);
+        final Object newVal = message.getHeader(RESOURCE_VALUE, 
message::getBody, Object.class);
+        final Object oldVal = message.getHeader(RESOURCE_OLD_VALUE, 
Object.class);
+
+        ObjectHelper.notNull(newVal, RESOURCE_VALUE);
+        ObjectHelper.notNull(oldVal, RESOURCE_OLD_VALUE);
+
+        if (ttl > 0) {
+            value.compareAndSet(oldVal, newVal, 
Duration.ofMillis(ttl)).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        } else {
+            value.compareAndSet(oldVal, newVal).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        }
+
+        return false;
+    }
+
+    // *********************************
+    // Implementation
+    // *********************************
+
+    @Override
+    protected String getProcessorKey(Message message) {
+        return message.getHeader(RESOURCE_ACTION, 
configuration::getDefaultAction, String.class);
+    }
+
+    @Override
+    protected String getResourceName(Message message) {
+        return message.getHeader(RESOURCE_NAME, 
getAtomixEndpoint()::getResourceName, String.class);
+    }
+
+    @Override
+    protected DistributedValue<Object> createResource(String resourceName) {
+        return getAtomixEndpoint()
+            .getAtomix()
+            .getValue(
+                resourceName,
+                new 
DistributedValue.Config(getAtomixEndpoint().getConfiguration().getResourceOptions(resourceName)),
+                new 
DistributedValue.Options(getAtomixEndpoint().getConfiguration().getResourceConfig(resourceName)))
+            .join();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterComponent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterComponent.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterComponent.java
deleted file mode 100644
index 8db6487..0000000
--- 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterComponent.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.atomix.cluster;
-
-import java.util.Map;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.Endpoint;
-import org.apache.camel.impl.DefaultComponent;
-
-public class AtomixClusterComponent extends DefaultComponent {
-    public AtomixClusterComponent() {
-    }
-
-    public AtomixClusterComponent(CamelContext camelContext) {
-        super(camelContext);
-    }
-
-    @Override
-    protected Endpoint createEndpoint(String uri, String remaining, 
Map<String, Object> parameters) throws Exception {
-        throw new UnsupportedOperationException("Not yet implemented");
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterConfiguration.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterConfiguration.java
deleted file mode 100644
index b05230f..0000000
--- 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterConfiguration.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.atomix.cluster;
-
-import io.atomix.AtomixReplica;
-import io.atomix.catalyst.transport.Transport;
-import io.atomix.copycat.server.storage.StorageLevel;
-import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.component.atomix.AtomixConfiguration;
-import org.apache.camel.spi.UriParam;
-import org.apache.camel.spi.UriParams;
-
-@UriParams
-public class AtomixClusterConfiguration extends 
AtomixConfiguration<AtomixReplica> implements Cloneable {
-
-    @UriParam
-    private Class<? extends Transport> clientTransport;
-
-    @UriParam
-    private Class<? extends Transport> serverTransport;
-
-    @UriParam
-    private String storagePath;
-
-    @UriParam(defaultValue = "MEMORY")
-    private StorageLevel storageLevel = StorageLevel.MEMORY;
-
-    public AtomixClusterConfiguration() {
-    }
-
-    // ******************************************
-    // Properties
-    // ******************************************
-
-
-    public Class<? extends Transport> getClientTransport() {
-        return clientTransport;
-    }
-
-    /**
-     * The client transport
-     */
-    public void setClientTransport(Class<? extends Transport> clientTransport) 
{
-        this.clientTransport = clientTransport;
-    }
-
-    public Class<? extends Transport> getServerTransport() {
-        return serverTransport;
-    }
-
-    /**
-     * The server transport
-     */
-    public void setServerTransport(Class<? extends Transport> serverTransport) 
{
-        this.serverTransport = serverTransport;
-    }
-
-    public String getStoragePath() {
-        return storagePath;
-    }
-
-    /**
-     * Sets the log directory.
-     */
-    public void setStoragePath(String storagePath) {
-        this.storagePath = storagePath;
-    }
-
-    public StorageLevel getStorageLevel() {
-        return storageLevel;
-    }
-
-    /**
-     * Sets the log storage level.
-     */
-    public void setStorageLevel(StorageLevel storageLevel) {
-        this.storageLevel = storageLevel;
-    }
-
-    // ****************************************
-    // Copy
-    // ****************************************
-
-    public AtomixClusterConfiguration copy() {
-        try {
-            return (AtomixClusterConfiguration) super.clone();
-        } catch (CloneNotSupportedException e) {
-            throw new RuntimeCamelException(e);
-        }
-    }
-}
\ No newline at end of file

Reply via email to