http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMapProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMapProducer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMapProducer.java new file mode 100644 index 0000000..696b6eb --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMapProducer.java @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.multimap; + +import java.time.Duration; + +import io.atomix.collections.DistributedMultiMap; +import io.atomix.resource.ReadConsistency; +import org.apache.camel.AsyncCallback; +import org.apache.camel.InvokeOnHeader; +import org.apache.camel.Message; +import org.apache.camel.component.atomix.client.AbstractAtomixClientProducer; +import org.apache.camel.util.ObjectHelper; + +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_ACTION; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_KEY; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_NAME; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_READ_CONSISTENCY; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_TTL; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_VALUE; + +final class AtomixMultiMapProducer extends AbstractAtomixClientProducer<AtomixMultiMapEndpoint, DistributedMultiMap> { + private final AtomixMultiMapConfiguration configuration; + + protected AtomixMultiMapProducer(AtomixMultiMapEndpoint endpoint) { + super(endpoint); + this.configuration = endpoint.getConfiguration(); + } + + // ********************************* + // Handlers + // ********************************* + + @InvokeOnHeader("PUT") + boolean onPut(Message message, AsyncCallback callback) throws Exception { + final DistributedMultiMap<Object, Object> map = getResource(message); + final Object key = message.getHeader(RESOURCE_KEY, configuration::getKey, Object.class); + final Object val = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); + final long ttl = message.getHeader(RESOURCE_TTL, configuration::getTtl, long.class); + + ObjectHelper.notNull(key, RESOURCE_KEY); + ObjectHelper.notNull(val, RESOURCE_VALUE); + + if (ttl > 0) { + map.put(key, val, Duration.ofMillis(ttl)).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + map.put(key, val).thenAccept( + result -> processResult(message, callback, result) + ); + } + + return false; + } + + @InvokeOnHeader("GET") + boolean onGet(Message message, AsyncCallback callback) throws Exception { + final DistributedMultiMap<Object, Object> map = getResource(message); + final Object key = message.getHeader(RESOURCE_KEY, configuration::getKey, Object.class); + final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); + + ObjectHelper.notNull(key, RESOURCE_KEY); + + if (consistency != null) { + map.get(key, consistency).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + map.get(key).thenAccept( + result -> processResult(message, callback, result) + ); + } + + return false; + } + + @InvokeOnHeader("CLEAR") + boolean onClear(Message message, AsyncCallback callback) throws Exception { + final DistributedMultiMap<Object, Object> map = getResource(message); + + map.clear().thenAccept( + result -> processResult(message, callback, result) + ); + + return false; + } + + @InvokeOnHeader("SIZE") + boolean onSize(Message message, AsyncCallback callback) throws Exception { + final DistributedMultiMap<Object, Object> map = getResource(message); + final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); + final Object key = message.getHeader(RESOURCE_KEY, message::getBody, Object.class); + + if (consistency != null) { + if (key != null) { + map.size(key, consistency).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + map.size(consistency).thenAccept( + result -> processResult(message, callback, result) + ); + } + } else { + if (key != null) { + map.size(key).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + map.size().thenAccept( + result -> processResult(message, callback, result) + ); + } + } + + return false; + } + + @InvokeOnHeader("IS_EMPTY") + boolean onIsEmpty(Message message, AsyncCallback callback) throws Exception { + final DistributedMultiMap<Object, Object> map = getResource(message); + final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); + + if (consistency != null) { + map.isEmpty(consistency).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + map.isEmpty().thenAccept( + result -> processResult(message, callback, result) + ); + } + + return false; + } + + @InvokeOnHeader("CONTAINS_KEY") + boolean onContainsKey(Message message, AsyncCallback callback) throws Exception { + final DistributedMultiMap<Object, Object> map = getResource(message); + final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); + final Object key = message.getHeader(RESOURCE_KEY, message::getBody, Object.class); + + ObjectHelper.notNull(key, RESOURCE_KEY); + + if (consistency != null) { + map.containsKey(key, consistency).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + map.containsKey(key).thenAccept( + result -> processResult(message, callback, result) + ); + } + + return false; + } + +// @InvokeOnHeader("CONTAINS_VALUE") +// boolean onContainsValue(Message message, AsyncCallback callback) throws Exception { +// final DistributedMultiMap<Object, Object> map = getResource(message); +// final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); +// final Object value = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); +// +// ObjectHelper.notNull(value, RESOURCE_VALUE); +// +// if (consistency != null) { +// map.containsValue(value, consistency).thenAccept( +// result -> processResult(message, callback, result) +// ); +// } else { +// map.containsValue(value).thenAccept( +// result -> processResult(message, callback, result) +// ); +// } +// +// return false; +// } + +// @InvokeOnHeader("CONTAINS_ENTRY") +// boolean onContainsEntry(Message message, AsyncCallback callback) throws Exception { +// final DistributedMultiMap<Object, Object> map = getResource(message); +// final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); +// final Object key = message.getHeader(RESOURCE_KEY, message::getBody, Object.class); +// final Object value = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); +// +// ObjectHelper.notNull(key, RESOURCE_VALUE); +// ObjectHelper.notNull(value, RESOURCE_KEY); +// +// if (consistency != null) { +// map.containsEntry(key, value, consistency).thenAccept( +// result -> processResult(message, callback, result) +// ); +// } else { +// map.containsEntry(key, value).thenAccept( +// result -> processResult(message, callback, result) +// ); +// } +// +// return false; +// } + + @InvokeOnHeader("REMOVE") + boolean onRemove(Message message, AsyncCallback callback) throws Exception { + final DistributedMultiMap<Object, Object> map = getResource(message); + final Object key = message.getHeader(RESOURCE_KEY, message::getBody, Object.class); + final Object value = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); + + ObjectHelper.notNull(key, RESOURCE_KEY); + + if (value != null) { + map.remove(key, value).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + map.remove(key).thenAccept( + result -> processResult(message, callback, result) + ); + } + + return false; + } + + @InvokeOnHeader("REMOVE_VALUE") + boolean onRemoveValue(Message message, AsyncCallback callback) throws Exception { + final DistributedMultiMap<Object, Object> map = getResource(message); + final Object value = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); + + ObjectHelper.notNull(value, RESOURCE_VALUE); + + map.removeValue(value).thenAccept( + result -> processResult(message, callback, result) + ); + + return false; + } + + // ********************************* + // Implementation + // ********************************* + + + @Override + protected String getProcessorKey(Message message) { + return message.getHeader(RESOURCE_ACTION, configuration::getDefaultAction, String.class); + } + + @Override + protected String getResourceName(Message message) { + return message.getHeader(RESOURCE_NAME, getAtomixEndpoint()::getResourceName, String.class); + } + + @Override + protected DistributedMultiMap<Object, Object> createResource(String resourceName) { + return getAtomixEndpoint() + .getAtomix() + .getMultiMap( + resourceName, + new DistributedMultiMap.Config(getAtomixEndpoint().getConfiguration().getResourceOptions(resourceName)), + new DistributedMultiMap.Options(getAtomixEndpoint().getConfiguration().getResourceConfig(resourceName))) + .join(); + } +}
http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueue.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueue.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueue.java new file mode 100644 index 0000000..edc8cb5 --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueue.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.queue; + +public final class AtomixQueue { + + public enum Action { + ADD, + OFFER, + PEEK, + POLL, + CLEAR, + CONTAINS, + IS_EMPTY, + REMOVE, + SIZE + } + + private AtomixQueue() { + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueComponent.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueComponent.java new file mode 100644 index 0000000..083a851 --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueComponent.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.queue; + +import java.util.Map; + +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.component.atomix.client.AbstractAtomixClientComponent; + +public final class AtomixQueueComponent extends AbstractAtomixClientComponent<AtomixQueueConfiguration> { + private AtomixQueueConfiguration configuration = new AtomixQueueConfiguration(); + + public AtomixQueueComponent() { + super(); + } + + public AtomixQueueComponent(CamelContext camelContext) { + super(camelContext); + } + + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + AtomixQueueConfiguration configuration = this.configuration.copy(); + + // Bind options to the configuration object + setConfigurationProperties(configuration, parameters); + + AtomixQueueEndpoint endpoint = new AtomixQueueEndpoint(uri, this, remaining); + endpoint.setConfiguration(configuration); + + setProperties(endpoint, parameters); + + return endpoint; + } + + // ********************************************** + // Properties + // ********************************************** + + public AtomixQueueConfiguration getConfiguration() { + return this.configuration; + } + + /** + * The shared component configuration + */ + public void setConfiguration(AtomixQueueConfiguration configuration) { + this.configuration = configuration; + } + + @Override + protected AtomixQueueConfiguration getComponentConfiguration() { + return getConfiguration(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueConfiguration.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueConfiguration.java new file mode 100644 index 0000000..0175e69 --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueConfiguration.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.queue; + +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.component.atomix.client.AtomixClientConfiguration; +import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriParams; + +@UriParams +public class AtomixQueueConfiguration extends AtomixClientConfiguration { + @UriParam(defaultValue = "ADD") + private AtomixQueue.Action defaultAction = AtomixQueue.Action.ADD; + + // **************************************** + // Properties + // **************************************** + + public AtomixQueue.Action getDefaultAction() { + return defaultAction; + } + + /** + * The default action. + */ + public void setDefaultAction(AtomixQueue.Action defaultAction) { + this.defaultAction = defaultAction; + } + + // **************************************** + // Copy + // **************************************** + + public AtomixQueueConfiguration copy() { + try { + return (AtomixQueueConfiguration) super.clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeCamelException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueConsumer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueConsumer.java new file mode 100644 index 0000000..eef6eec --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueConsumer.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.queue; + +import java.util.ArrayList; +import java.util.List; + +import io.atomix.catalyst.concurrent.Listener; +import io.atomix.collections.DistributedQueue; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.atomix.client.AbstractAtomixClientConsumer; +import org.apache.camel.component.atomix.client.AtomixClientConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class AtomixQueueConsumer extends AbstractAtomixClientConsumer<AtomixQueueEndpoint> { + private static final Logger LOGGER = LoggerFactory.getLogger(AtomixQueueConsumer.class); + + private final List<Listener<DistributedQueue.ValueEvent<Object>>> listeners; + private final String resourceName; + private final String resultHeader; + private DistributedQueue<Object> queue; + + public AtomixQueueConsumer(AtomixQueueEndpoint endpoint, Processor processor, String resourceName) { + super(endpoint, processor); + this.listeners = new ArrayList<>(); + this.resourceName = resourceName; + this.resultHeader = endpoint.getConfiguration().getResultHeader(); + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + + this.queue = getAtomixEndpoint() + .getAtomix() + .getQueue( + resourceName, + new DistributedQueue.Config(getAtomixEndpoint().getConfiguration().getResourceOptions(resourceName)), + new DistributedQueue.Options(getAtomixEndpoint().getConfiguration().getResourceConfig(resourceName))) + .join(); + + + LOGGER.debug("Subscribe to events for queue: {}", resourceName); + this.listeners.add(this.queue.onAdd(this::onEvent).join()); + this.listeners.add(this.queue.onRemove(this::onEvent).join()); + } + + @Override + protected void doStop() throws Exception { + // close listeners + listeners.forEach(Listener::close); + + super.doStart(); + } + + // ******************************************** + // Event handler + // ******************************************** + + private void onEvent(DistributedQueue.ValueEvent<Object> event) { + Exchange exchange = getEndpoint().createExchange(); + exchange.getIn().setHeader(AtomixClientConstants.EVENT_TYPE, event.type()); + + if (resultHeader == null) { + exchange.getIn().setBody(event.value()); + } else { + exchange.getIn().setHeader(resultHeader, event.value()); + } + + try { + getProcessor().process(exchange); + } catch (Exception e) { + getExceptionHandler().handleException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueEndpoint.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueEndpoint.java new file mode 100644 index 0000000..4a16c1a --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueEndpoint.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.queue; + + +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.component.atomix.client.AbstractAtomixClientEndpoint; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; + +@UriEndpoint( + firstVersion = "2.20.0", + scheme = "atomix-queue", + title = "Atomix Queue", + syntax = "atomix-queue:queueName", + consumerClass = AtomixQueueConsumer.class, + label = "clustering") +final class AtomixQueueEndpoint extends AbstractAtomixClientEndpoint<AtomixQueueComponent, AtomixQueueConfiguration> { + @UriParam + private AtomixQueueConfiguration configuration; + + public AtomixQueueEndpoint(String uri, AtomixQueueComponent component, String resourceName) { + super(uri, component, resourceName); + } + + @Override + public Producer createProducer() throws Exception { + return new AtomixQueueProducer(this); + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + return new AtomixQueueConsumer(this, processor, getResourceName()); + } + + @Override + public AtomixQueueConfiguration getConfiguration() { + return this.configuration; + } + + @Override + public void setConfiguration(AtomixQueueConfiguration configuration) { + this.configuration = configuration; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueProducer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueProducer.java new file mode 100644 index 0000000..c207e34 --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueProducer.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.queue; + +import io.atomix.collections.DistributedQueue; +import io.atomix.resource.ReadConsistency; +import org.apache.camel.AsyncCallback; +import org.apache.camel.InvokeOnHeader; +import org.apache.camel.Message; +import org.apache.camel.component.atomix.client.AbstractAtomixClientProducer; +import org.apache.camel.util.ObjectHelper; + +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_ACTION; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_NAME; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_READ_CONSISTENCY; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_VALUE; + +final class AtomixQueueProducer extends AbstractAtomixClientProducer<AtomixQueueEndpoint, DistributedQueue> { + private final AtomixQueueConfiguration configuration; + + protected AtomixQueueProducer(AtomixQueueEndpoint endpoint) { + super(endpoint); + this.configuration = endpoint.getConfiguration(); + } + + // ********************************* + // Handlers + // ********************************* + + @InvokeOnHeader("ADD") + boolean onAdd(Message message, AsyncCallback callback) throws Exception { + final DistributedQueue<Object> queue = getResource(message); + final Object val = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); + + ObjectHelper.notNull(val, RESOURCE_VALUE); + + queue.add(val).thenAccept( + result -> processResult(message, callback, result) + ); + + return false; + } + + @InvokeOnHeader("OFFER") + boolean onOffer(Message message, AsyncCallback callback) throws Exception { + final DistributedQueue<Object> queue = getResource(message); + final Object val = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); + + ObjectHelper.notNull(val, RESOURCE_VALUE); + + queue.offer(val).thenAccept( + result -> processResult(message, callback, result) + ); + + return false; + } + + @InvokeOnHeader("PEEK") + boolean onPeek(Message message, AsyncCallback callback) throws Exception { + final DistributedQueue<Object> queue = getResource(message); + + queue.peek().thenAccept( + result -> processResult(message, callback, result) + ); + + return false; + } + + @InvokeOnHeader("POLL") + boolean onPoll(Message message, AsyncCallback callback) throws Exception { + final DistributedQueue<Object> queue = getResource(message); + + queue.poll().thenAccept( + result -> processResult(message, callback, result) + ); + + return false; + } + + @InvokeOnHeader("CLEAR") + boolean onClear(Message message, AsyncCallback callback) throws Exception { + final DistributedQueue<Object> queue = getResource(message); + + queue.clear().thenAccept( + result -> processResult(message, callback, result) + ); + + return false; + } + + @InvokeOnHeader("CONTAINS") + boolean onContains(Message message, AsyncCallback callback) throws Exception { + final DistributedQueue<Object> queue = getResource(message); + final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); + final Object value = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); + + ObjectHelper.notNull(value, RESOURCE_VALUE); + + if (consistency != null) { + queue.contains(value, consistency).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + queue.contains(value).thenAccept( + result -> processResult(message, callback, result) + ); + } + + return false; + } + + @InvokeOnHeader("IS_EMPTY") + boolean onIsEmpty(Message message, AsyncCallback callback) throws Exception { + final DistributedQueue<Object> queue = getResource(message); + final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); + + if (consistency != null) { + queue.isEmpty(consistency).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + queue.isEmpty().thenAccept( + result -> processResult(message, callback, result) + ); + } + + return false; + } + + @InvokeOnHeader("REMOVE") + boolean onRemove(Message message, AsyncCallback callback) throws Exception { + final DistributedQueue<Object> queue = getResource(message); + final Object value = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); + + if (value == null) { + queue.remove().thenAccept( + result -> processResult(message, callback, result) + ); + } else { + queue.remove(value).thenAccept( + result -> processResult(message, callback, result) + ); + } + + return false; + } + + @InvokeOnHeader("SIZE") + boolean onSize(Message message, AsyncCallback callback) throws Exception { + final DistributedQueue<Object> queue = getResource(message); + final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); + + if (consistency != null) { + queue.size(consistency).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + queue.size().thenAccept( + result -> processResult(message, callback, result) + ); + } + + return false; + } + + // ********************************* + // Implementation + // ********************************* + + @Override + protected String getProcessorKey(Message message) { + return message.getHeader(RESOURCE_ACTION, configuration::getDefaultAction, String.class); + } + + @Override + protected String getResourceName(Message message) { + return message.getHeader(RESOURCE_NAME, getAtomixEndpoint()::getResourceName, String.class); + } + + @Override + protected DistributedQueue<Object> createResource(String resourceName) { + return getAtomixEndpoint() + .getAtomix() + .getQueue( + resourceName, + new DistributedQueue.Config(getAtomixEndpoint().getConfiguration().getResourceOptions(resourceName)), + new DistributedQueue.Options(getAtomixEndpoint().getConfiguration().getResourceConfig(resourceName))) + .join(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSet.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSet.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSet.java new file mode 100644 index 0000000..ba5b24c --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSet.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.set; + +public final class AtomixSet { + + public enum Action { + ADD, + CLEAR, + CONTAINS, + IS_EMPTY, + REMOVE, + SIZE + } + + private AtomixSet() { + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetComponent.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetComponent.java new file mode 100644 index 0000000..d91de4b --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetComponent.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.set; + +import java.util.Map; + +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.component.atomix.client.AbstractAtomixClientComponent; + +public final class AtomixSetComponent extends AbstractAtomixClientComponent<AtomixSetConfiguration> { + private AtomixSetConfiguration configuration = new AtomixSetConfiguration(); + + public AtomixSetComponent() { + super(); + } + + public AtomixSetComponent(CamelContext camelContext) { + super(camelContext); + } + + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + AtomixSetConfiguration configuration = this.configuration.copy(); + + // Bind options to the configuration object + setConfigurationProperties(configuration, parameters); + + AtomixSetEndpoint endpoint = new AtomixSetEndpoint(uri, this, remaining); + endpoint.setConfiguration(configuration); + + setProperties(endpoint, parameters); + + return endpoint; + } + + // ********************************************** + // Properties + // ********************************************** + + public AtomixSetConfiguration getConfiguration() { + return this.configuration; + } + + /** + * The shared component configuration + */ + public void setConfiguration(AtomixSetConfiguration configuration) { + this.configuration = configuration; + } + + @Override + protected AtomixSetConfiguration getComponentConfiguration() { + return getConfiguration(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetConfiguration.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetConfiguration.java new file mode 100644 index 0000000..e7a67c9 --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetConfiguration.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.set; + +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.component.atomix.client.AtomixClientConfiguration; +import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriParams; + +@UriParams +public class AtomixSetConfiguration extends AtomixClientConfiguration { + @UriParam(defaultValue = "ADD") + private AtomixSet.Action defaultAction = AtomixSet.Action.ADD; + @UriParam + private long ttl; + + // **************************************** + // Properties + // **************************************** + + public AtomixSet.Action getDefaultAction() { + return defaultAction; + } + + /** + * The default action. + */ + public void setDefaultAction(AtomixSet.Action defaultAction) { + this.defaultAction = defaultAction; + } + + public long getTtl() { + return ttl; + } + + /** + * The resource ttl. + */ + public void setTtl(long ttl) { + this.ttl = ttl; + } + + // **************************************** + // Copy + // **************************************** + + public AtomixSetConfiguration copy() { + try { + return (AtomixSetConfiguration) super.clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeCamelException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetConsumer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetConsumer.java new file mode 100644 index 0000000..e20a719 --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetConsumer.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.set; + +import java.util.ArrayList; +import java.util.List; + +import io.atomix.catalyst.concurrent.Listener; +import io.atomix.collections.DistributedSet; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.atomix.client.AbstractAtomixClientConsumer; +import org.apache.camel.component.atomix.client.AtomixClientConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class AtomixSetConsumer extends AbstractAtomixClientConsumer<AtomixSetEndpoint> { + private static final Logger LOGGER = LoggerFactory.getLogger(AtomixSetConsumer.class); + + private final List<Listener<DistributedSet.ValueEvent<Object>>> listeners; + private final String resourceName; + private final String resultHeader; + private DistributedSet<Object> set; + + public AtomixSetConsumer(AtomixSetEndpoint endpoint, Processor processor, String resourceName) { + super(endpoint, processor); + this.listeners = new ArrayList<>(); + this.resourceName = resourceName; + this.resultHeader = endpoint.getConfiguration().getResultHeader(); + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + + this.set = getAtomixEndpoint() + .getAtomix() + .getSet( + resourceName, + new DistributedSet.Config(getAtomixEndpoint().getConfiguration().getResourceOptions(resourceName)), + new DistributedSet.Options(getAtomixEndpoint().getConfiguration().getResourceConfig(resourceName))) + .join(); + + + LOGGER.debug("Subscribe to events for set: {}", resourceName); + this.listeners.add(this.set.onAdd(this::onEvent).join()); + this.listeners.add(this.set.onRemove(this::onEvent).join()); + } + + @Override + protected void doStop() throws Exception { + // close listeners + listeners.forEach(Listener::close); + + super.doStart(); + } + + // ******************************************** + // Event handler + // ******************************************** + + private void onEvent(DistributedSet.ValueEvent<Object> event) { + Exchange exchange = getEndpoint().createExchange(); + exchange.getIn().setHeader(AtomixClientConstants.EVENT_TYPE, event.type()); + + if (resultHeader == null) { + exchange.getIn().setBody(event.value()); + } else { + exchange.getIn().setHeader(resultHeader, event.value()); + } + + try { + getProcessor().process(exchange); + } catch (Exception e) { + getExceptionHandler().handleException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetEndpoint.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetEndpoint.java new file mode 100644 index 0000000..9a1c64c --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetEndpoint.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.set; + + +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.component.atomix.client.AbstractAtomixClientEndpoint; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; + +@UriEndpoint( + firstVersion = "2.20.0", + scheme = "atomix-set", + title = "Atomix Set", + syntax = "atomix-set:setName", + consumerClass = AtomixSetConsumer.class, + label = "clustering") +final class AtomixSetEndpoint extends AbstractAtomixClientEndpoint<AtomixSetComponent, AtomixSetConfiguration> { + @UriParam + private AtomixSetConfiguration configuration; + + public AtomixSetEndpoint(String uri, AtomixSetComponent component, String resourceName) { + super(uri, component, resourceName); + } + + @Override + public Producer createProducer() throws Exception { + return new AtomixSetProducer(this); + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + return new AtomixSetConsumer(this, processor, getResourceName()); + } + + @Override + public AtomixSetConfiguration getConfiguration() { + return this.configuration; + } + + @Override + public void setConfiguration(AtomixSetConfiguration configuration) { + this.configuration = configuration; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetProducer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetProducer.java new file mode 100644 index 0000000..87a4406 --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetProducer.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.set; + +import java.time.Duration; + +import io.atomix.collections.DistributedSet; +import io.atomix.resource.ReadConsistency; +import org.apache.camel.AsyncCallback; +import org.apache.camel.InvokeOnHeader; +import org.apache.camel.Message; +import org.apache.camel.component.atomix.client.AbstractAtomixClientProducer; +import org.apache.camel.util.ObjectHelper; + +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_ACTION; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_NAME; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_READ_CONSISTENCY; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_TTL; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_VALUE; + +final class AtomixSetProducer extends AbstractAtomixClientProducer<AtomixSetEndpoint, DistributedSet> { + private final AtomixSetConfiguration configuration; + + protected AtomixSetProducer(AtomixSetEndpoint endpoint) { + super(endpoint); + this.configuration = endpoint.getConfiguration(); + } + + // ********************************* + // Handlers + // ********************************* + + @InvokeOnHeader("ADD") + boolean onAdd(Message message, AsyncCallback callback) throws Exception { + final DistributedSet<Object> set = getResource(message); + final long ttl = message.getHeader(RESOURCE_TTL, configuration::getTtl, long.class); + final Object val = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); + + ObjectHelper.notNull(val, RESOURCE_VALUE); + + if (ttl > 0) { + set.add(val, Duration.ofMillis(ttl)).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + set.add(val).thenAccept( + result -> processResult(message, callback, result) + ); + } + + return false; + } + + @InvokeOnHeader("CLEAR") + boolean onClear(Message message, AsyncCallback callback) throws Exception { + final DistributedSet<Object> set = getResource(message); + + set.clear().thenAccept( + result -> processResult(message, callback, result) + ); + + return false; + } + + @InvokeOnHeader("CONTAINS") + boolean onContains(Message message, AsyncCallback callback) throws Exception { + final DistributedSet<Object> set = getResource(message); + final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); + final Object value = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); + + ObjectHelper.notNull(value, RESOURCE_VALUE); + + if (consistency != null) { + set.contains(value, consistency).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + set.contains(value).thenAccept( + result -> processResult(message, callback, result) + ); + } + + return false; + } + + @InvokeOnHeader("IS_EMPTY") + boolean onIsEmpty(Message message, AsyncCallback callback) throws Exception { + final DistributedSet<Object> set = getResource(message); + final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); + + if (consistency != null) { + set.isEmpty(consistency).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + set.isEmpty().thenAccept( + result -> processResult(message, callback, result) + ); + } + + return false; + } + + @InvokeOnHeader("REMOVE") + boolean onRemove(Message message, AsyncCallback callback) throws Exception { + final DistributedSet<Object> set = getResource(message); + final Object value = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); + + ObjectHelper.notNull(value, RESOURCE_VALUE); + + set.remove(value).thenAccept( + result -> processResult(message, callback, result) + ); + + return false; + } + + @InvokeOnHeader("SIZE") + boolean onSize(Message message, AsyncCallback callback) throws Exception { + final DistributedSet<Object> set = getResource(message); + final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); + + if (consistency != null) { + set.size(consistency).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + set.size().thenAccept( + result -> processResult(message, callback, result) + ); + } + + return false; + } + + // ********************************* + // Implementation + // ********************************* + + @Override + protected String getProcessorKey(Message message) { + return message.getHeader(RESOURCE_ACTION, configuration::getDefaultAction, String.class); + } + + @Override + protected String getResourceName(Message message) { + return message.getHeader(RESOURCE_NAME, getAtomixEndpoint()::getResourceName, String.class); + } + + @Override + protected DistributedSet<Object> createResource(String resourceName) { + return getAtomixEndpoint() + .getAtomix() + .getSet( + resourceName, + new DistributedSet.Config(getAtomixEndpoint().getConfiguration().getResourceOptions(resourceName)), + new DistributedSet.Options(getAtomixEndpoint().getConfiguration().getResourceConfig(resourceName))) + .join(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValue.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValue.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValue.java new file mode 100644 index 0000000..0f082f6 --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValue.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.value; + +public final class AtomixValue { + + public enum Action { + SET, + GET, + GET_AND_SET, + COMPARE_AND_SET + } + + private AtomixValue() { + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueComponent.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueComponent.java new file mode 100644 index 0000000..d6d7ffb --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueComponent.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.value; + +import java.util.Map; + +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.component.atomix.client.AbstractAtomixClientComponent; + +public final class AtomixValueComponent extends AbstractAtomixClientComponent<AtomixValueConfiguration> { + private AtomixValueConfiguration configuration = new AtomixValueConfiguration(); + + public AtomixValueComponent() { + super(); + } + + public AtomixValueComponent(CamelContext camelContext) { + super(camelContext); + } + + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + AtomixValueConfiguration configuration = this.configuration.copy(); + + // Bind options to the configuration object + setConfigurationProperties(configuration, parameters); + + AtomixValueEndpoint endpoint = new AtomixValueEndpoint(uri, this, remaining); + endpoint.setConfiguration(configuration); + + setProperties(endpoint, parameters); + + return endpoint; + } + + // ********************************************** + // Properties + // ********************************************** + + public AtomixValueConfiguration getConfiguration() { + return this.configuration; + } + + /** + * The shared component configuration + */ + public void setConfiguration(AtomixValueConfiguration configuration) { + this.configuration = configuration; + } + + @Override + protected AtomixValueConfiguration getComponentConfiguration() { + return getConfiguration(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueConfiguration.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueConfiguration.java new file mode 100644 index 0000000..79f8aa7 --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueConfiguration.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.value; + +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.component.atomix.client.AtomixClientConfiguration; +import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriParams; + +@UriParams +public class AtomixValueConfiguration extends AtomixClientConfiguration { + @UriParam(defaultValue = "SET") + private AtomixValue.Action defaultAction = AtomixValue.Action.SET; + @UriParam + private long ttl; + + // **************************************** + // Properties + // **************************************** + + public AtomixValue.Action getDefaultAction() { + return defaultAction; + } + + /** + * The default action. + */ + public void setDefaultAction(AtomixValue.Action defaultAction) { + this.defaultAction = defaultAction; + } + + public long getTtl() { + return ttl; + } + + /** + * The resource ttl. + */ + public void setTtl(long ttl) { + this.ttl = ttl; + } + + // **************************************** + // Copy + // **************************************** + + public AtomixValueConfiguration copy() { + try { + return (AtomixValueConfiguration) super.clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeCamelException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueConsumer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueConsumer.java new file mode 100644 index 0000000..ecb759e --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueConsumer.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.value; + +import java.util.ArrayList; +import java.util.List; + +import io.atomix.catalyst.concurrent.Listener; +import io.atomix.variables.DistributedValue; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.atomix.client.AbstractAtomixClientConsumer; +import org.apache.camel.component.atomix.client.AtomixClientConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class AtomixValueConsumer extends AbstractAtomixClientConsumer<AtomixValueEndpoint> { + private static final Logger LOGGER = LoggerFactory.getLogger(AtomixValueConsumer.class); + + private final List<Listener<DistributedValue.ChangeEvent<Object>>> listeners; + private final String resourceName; + private final String resultHeader; + private DistributedValue<Object> value; + + public AtomixValueConsumer(AtomixValueEndpoint endpoint, Processor processor, String resourceName) { + super(endpoint, processor); + this.listeners = new ArrayList<>(); + this.resourceName = resourceName; + this.resultHeader = endpoint.getConfiguration().getResultHeader(); + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + + this.value = getAtomixEndpoint() + .getAtomix() + .getValue( + resourceName, + new DistributedValue.Config(getAtomixEndpoint().getConfiguration().getResourceOptions(resourceName)), + new DistributedValue.Options(getAtomixEndpoint().getConfiguration().getResourceConfig(resourceName))) + .join(); + + + LOGGER.debug("Subscribe to events for queue: {}", resourceName); + this.listeners.add(this.value.onChange(this::onEvent).join()); + } + + @Override + protected void doStop() throws Exception { + // close listeners + listeners.forEach(Listener::close); + + super.doStart(); + } + + // ******************************************** + // Event handler + // ******************************************** + + private void onEvent(DistributedValue.ChangeEvent<Object> event) { + Exchange exchange = getEndpoint().createExchange(); + exchange.getIn().setHeader(AtomixClientConstants.EVENT_TYPE, event.type()); + exchange.getIn().setHeader(AtomixClientConstants.RESOURCE_OLD_VALUE, event.oldValue()); + + if (resultHeader == null) { + exchange.getIn().setBody(event.newValue()); + } else { + exchange.getIn().setHeader(resultHeader, event.newValue()); + } + + try { + getProcessor().process(exchange); + } catch (Exception e) { + getExceptionHandler().handleException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueEndpoint.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueEndpoint.java new file mode 100644 index 0000000..e4478e0 --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueEndpoint.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.value; + + +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.component.atomix.client.AbstractAtomixClientEndpoint; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; + +@UriEndpoint( + firstVersion = "2.20.0", + scheme = "atomix-value", + title = "Atomix Value", + syntax = "atomix-value:valueName", + consumerClass = AtomixValueConsumer.class, + label = "clustering") +final class AtomixValueEndpoint extends AbstractAtomixClientEndpoint<AtomixValueComponent, AtomixValueConfiguration> { + @UriParam + private AtomixValueConfiguration configuration; + + public AtomixValueEndpoint(String uri, AtomixValueComponent component, String resourceName) { + super(uri, component, resourceName); + } + + @Override + public Producer createProducer() throws Exception { + return new AtomixValueProducer(this); + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + return new AtomixValueConsumer(this, processor, getResourceName()); + } + + @Override + public AtomixValueConfiguration getConfiguration() { + return this.configuration; + } + + @Override + public void setConfiguration(AtomixValueConfiguration configuration) { + this.configuration = configuration; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueProducer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueProducer.java new file mode 100644 index 0000000..c6ca5c4 --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueProducer.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.value; + +import java.time.Duration; + +import io.atomix.resource.ReadConsistency; +import io.atomix.variables.DistributedValue; +import org.apache.camel.AsyncCallback; +import org.apache.camel.InvokeOnHeader; +import org.apache.camel.Message; +import org.apache.camel.component.atomix.client.AbstractAtomixClientProducer; +import org.apache.camel.util.ObjectHelper; + +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_ACTION; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_NAME; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_OLD_VALUE; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_READ_CONSISTENCY; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_TTL; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_VALUE; + +final class AtomixValueProducer extends AbstractAtomixClientProducer<AtomixValueEndpoint, DistributedValue> { + private final AtomixValueConfiguration configuration; + + protected AtomixValueProducer(AtomixValueEndpoint endpoint) { + super(endpoint); + this.configuration = endpoint.getConfiguration(); + } + + // ********************************* + // Handlers + // ********************************* + + @InvokeOnHeader("SET") + boolean onSet(Message message, AsyncCallback callback) throws Exception { + final DistributedValue<Object> value = getResource(message); + final long ttl = message.getHeader(RESOURCE_TTL, configuration::getTtl, long.class); + final Object val = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); + + ObjectHelper.notNull(val, RESOURCE_VALUE); + + if (ttl > 0) { + value.set(val, Duration.ofMillis(ttl)).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + value.set(val).thenAccept( + result -> processResult(message, callback, result) + ); + } + + return false; + } + + @InvokeOnHeader("GET") + boolean onGet(Message message, AsyncCallback callback) throws Exception { + final DistributedValue<Object> value = getResource(message); + final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); + + if (consistency != null) { + value.get(consistency).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + value.get().thenAccept( + result -> processResult(message, callback, result) + ); + } + + return false; + } + + @InvokeOnHeader("GET_AND_SET") + boolean onGetAndSet(Message message, AsyncCallback callback) throws Exception { + final DistributedValue<Object> value = getResource(message); + final long ttl = message.getHeader(RESOURCE_TTL, configuration::getTtl, long.class); + final Object val = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); + + ObjectHelper.notNull(val, RESOURCE_VALUE); + + if (ttl > 0) { + value.getAndSet(val, Duration.ofMillis(ttl)).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + value.getAndSet(val).thenAccept( + result -> processResult(message, callback, result) + ); + } + + return false; + } + + @InvokeOnHeader("COMPARE_AND_SET") + boolean onCompareAndSet(Message message, AsyncCallback callback) throws Exception { + final DistributedValue<Object> value = getResource(message); + final long ttl = message.getHeader(RESOURCE_TTL, configuration::getTtl, long.class); + final Object newVal = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); + final Object oldVal = message.getHeader(RESOURCE_OLD_VALUE, Object.class); + + ObjectHelper.notNull(newVal, RESOURCE_VALUE); + ObjectHelper.notNull(oldVal, RESOURCE_OLD_VALUE); + + if (ttl > 0) { + value.compareAndSet(oldVal, newVal, Duration.ofMillis(ttl)).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + value.compareAndSet(oldVal, newVal).thenAccept( + result -> processResult(message, callback, result) + ); + } + + return false; + } + + // ********************************* + // Implementation + // ********************************* + + @Override + protected String getProcessorKey(Message message) { + return message.getHeader(RESOURCE_ACTION, configuration::getDefaultAction, String.class); + } + + @Override + protected String getResourceName(Message message) { + return message.getHeader(RESOURCE_NAME, getAtomixEndpoint()::getResourceName, String.class); + } + + @Override + protected DistributedValue<Object> createResource(String resourceName) { + return getAtomixEndpoint() + .getAtomix() + .getValue( + resourceName, + new DistributedValue.Config(getAtomixEndpoint().getConfiguration().getResourceOptions(resourceName)), + new DistributedValue.Options(getAtomixEndpoint().getConfiguration().getResourceConfig(resourceName))) + .join(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterComponent.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterComponent.java deleted file mode 100644 index 8db6487..0000000 --- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterComponent.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.atomix.cluster; - -import java.util.Map; - -import org.apache.camel.CamelContext; -import org.apache.camel.Endpoint; -import org.apache.camel.impl.DefaultComponent; - -public class AtomixClusterComponent extends DefaultComponent { - public AtomixClusterComponent() { - } - - public AtomixClusterComponent(CamelContext camelContext) { - super(camelContext); - } - - @Override - protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { - throw new UnsupportedOperationException("Not yet implemented"); - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterConfiguration.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterConfiguration.java deleted file mode 100644 index b05230f..0000000 --- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterConfiguration.java +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.atomix.cluster; - -import io.atomix.AtomixReplica; -import io.atomix.catalyst.transport.Transport; -import io.atomix.copycat.server.storage.StorageLevel; -import org.apache.camel.RuntimeCamelException; -import org.apache.camel.component.atomix.AtomixConfiguration; -import org.apache.camel.spi.UriParam; -import org.apache.camel.spi.UriParams; - -@UriParams -public class AtomixClusterConfiguration extends AtomixConfiguration<AtomixReplica> implements Cloneable { - - @UriParam - private Class<? extends Transport> clientTransport; - - @UriParam - private Class<? extends Transport> serverTransport; - - @UriParam - private String storagePath; - - @UriParam(defaultValue = "MEMORY") - private StorageLevel storageLevel = StorageLevel.MEMORY; - - public AtomixClusterConfiguration() { - } - - // ****************************************** - // Properties - // ****************************************** - - - public Class<? extends Transport> getClientTransport() { - return clientTransport; - } - - /** - * The client transport - */ - public void setClientTransport(Class<? extends Transport> clientTransport) { - this.clientTransport = clientTransport; - } - - public Class<? extends Transport> getServerTransport() { - return serverTransport; - } - - /** - * The server transport - */ - public void setServerTransport(Class<? extends Transport> serverTransport) { - this.serverTransport = serverTransport; - } - - public String getStoragePath() { - return storagePath; - } - - /** - * Sets the log directory. - */ - public void setStoragePath(String storagePath) { - this.storagePath = storagePath; - } - - public StorageLevel getStorageLevel() { - return storageLevel; - } - - /** - * Sets the log storage level. - */ - public void setStorageLevel(StorageLevel storageLevel) { - this.storageLevel = storageLevel; - } - - // **************************************** - // Copy - // **************************************** - - public AtomixClusterConfiguration copy() { - try { - return (AtomixClusterConfiguration) super.clone(); - } catch (CloneNotSupportedException e) { - throw new RuntimeCamelException(e); - } - } -} \ No newline at end of file