http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapProducer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapProducer.java deleted file mode 100644 index 6e7fee2..0000000 --- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapProducer.java +++ /dev/null @@ -1,363 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.atomix.client.map; - -import java.time.Duration; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import io.atomix.collections.DistributedMap; -import io.atomix.resource.ReadConsistency; -import org.apache.camel.AsyncCallback; -import org.apache.camel.Message; -import org.apache.camel.component.atomix.client.AbstractAsyncAtomixClientProducer; -import org.apache.camel.component.atomix.client.AtomixClientAction; -import org.apache.camel.util.ExchangeHelper; -import org.apache.camel.util.ObjectHelper; - -import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_ACTION; -import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_ACTION_HAS_RESULT; -import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_DEFAULT_VALUE; -import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_KEY; -import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_NAME; -import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_OLD_VALUE; -import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_READ_CONSISTENCY; -import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_TTL; -import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_VALUE; - -final class AtomixClientMapProducer extends AbstractAsyncAtomixClientProducer<AtomixClientMapEndpoint> { - private final String mapName; - private final ConcurrentMap<String, DistributedMap<Object, Object>> maps; - private final AtomixClientMapConfiguration configuration; - - protected AtomixClientMapProducer(AtomixClientMapEndpoint endpoint, String mapName) { - super(endpoint); - this.mapName = ObjectHelper.notNull(mapName, "map name"); - this.configuration = endpoint.getAtomixConfiguration(); - this.maps = new ConcurrentHashMap<>(); - } - - @Override - protected AtomixClientAction getAction(Message message) { - return message.getHeader(RESOURCE_ACTION, configuration::getDefaultAction, AtomixClientAction.class); - } - - // ********************************* - // Handlers - // ********************************* - - @AsyncInvokeOnHeader(AtomixClientAction.PUT) - boolean onPut(Message message, AsyncCallback callback) throws Exception { - final DistributedMap<Object, Object> map = getMap(message); - final Object key = ExchangeHelper.getMandatoryHeader(message, RESOURCE_KEY, Object.class); - final Duration ttl = message.getHeader(RESOURCE_TTL, configuration::getTtl, Duration.class); - - if (ttl != null) { - map.put( - key, - message.getMandatoryBody(), - ttl - ).thenAccept( - result -> processResult(message, callback, result) - ); - } else { - map.put( - key, - message.getMandatoryBody() - ).thenAccept( - result -> processResult(message, callback, result) - ); - } - - return false; - } - - @AsyncInvokeOnHeader(AtomixClientAction.PUT_IF_ABSENT) - boolean onPutIfAbsent(Message message, AsyncCallback callback) throws Exception { - final DistributedMap<Object, Object> map = getMap(message); - final Object key = ExchangeHelper.getMandatoryHeader(message, RESOURCE_KEY, Object.class); - final Duration ttl = message.getHeader(RESOURCE_TTL, configuration::getTtl, Duration.class); - - if (ttl != null) { - map.putIfAbsent( - key, - message.getMandatoryBody(), - ttl - ).thenAccept( - result -> processResult(message, callback, result) - ); - } else { - map.putIfAbsent( - key, - message.getMandatoryBody() - ).thenAccept( - result -> processResult(message, callback, result) - ); - } - - return false; - } - - @AsyncInvokeOnHeader(AtomixClientAction.GET) - boolean onGet(Message message, AsyncCallback callback) throws Exception { - final DistributedMap<Object, Object> map = getMap(message); - final Object key = ExchangeHelper.getMandatoryHeader(message, RESOURCE_KEY, Object.class); - final Object defaultValue = message.getHeader(RESOURCE_DEFAULT_VALUE); - final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); - - if (consistency != null) { - if (defaultValue != null) { - map.getOrDefault(key, defaultValue, consistency).thenAccept( - result -> processResult(message, callback, result) - ); - } else { - map.get(key, consistency).thenAccept( - result -> processResult(message, callback, result) - ); - } - } else { - if (defaultValue != null) { - map.getOrDefault(key, defaultValue).thenAccept( - result -> processResult(message, callback, result) - ); - } else { - map.get(key).thenAccept( - result -> processResult(message, callback, result) - ); - } - } - - return false; - } - - @AsyncInvokeOnHeader(AtomixClientAction.CLEAR) - boolean onClear(Message message, AsyncCallback callback) throws Exception { - final DistributedMap<Object, Object> map = getMap(message); - - map.clear().thenAccept( - result -> processResult(message, callback, result) - ); - - return false; - } - - @AsyncInvokeOnHeader(AtomixClientAction.SIZE) - boolean onSize(Message message, AsyncCallback callback) throws Exception { - final DistributedMap<Object, Object> map = getMap(message); - final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); - - if (consistency != null) { - map.size(consistency).thenAccept( - result -> processResult(message, callback, result) - ); - } else { - map.size().thenAccept( - result -> processResult(message, callback, result) - ); - } - - return false; - } - - @AsyncInvokeOnHeader(AtomixClientAction.IS_EMPTY) - boolean onIsEmpty(Message message, AsyncCallback callback) throws Exception { - final DistributedMap<Object, Object> map = getMap(message); - final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); - - if (consistency != null) { - map.isEmpty(consistency).thenAccept( - result -> processResult(message, callback, result) - ); - } else { - map.isEmpty().thenAccept( - result -> processResult(message, callback, result) - ); - } - - return false; - } - - @AsyncInvokeOnHeader(AtomixClientAction.ENTRY_SET) - boolean onEntrySet(Message message, AsyncCallback callback) throws Exception { - final DistributedMap<Object, Object> map = getMap(message); - final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); - - if (consistency != null) { - map.entrySet(consistency).thenAccept( - result -> processResult(message, callback, result) - ); - } else { - map.entrySet().thenAccept( - result -> processResult(message, callback, result) - ); - } - - return false; - } - - @AsyncInvokeOnHeader(AtomixClientAction.VALUES) - boolean onValues(Message message, AsyncCallback callback) throws Exception { - final DistributedMap<Object, Object> map = getMap(message); - final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); - - if (consistency != null) { - map.values(consistency).thenAccept( - result -> processResult(message, callback, result) - ); - } else { - map.values().thenAccept( - result -> processResult(message, callback, result) - ); - } - - return false; - } - - @AsyncInvokeOnHeader(AtomixClientAction.CONTAINS_KEY) - boolean onContainsKey(Message message, AsyncCallback callback) throws Exception { - final DistributedMap<Object, Object> map = getMap(message); - final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); - final Object key = message.getHeader(RESOURCE_KEY, message::getBody, Object.class); - - ObjectHelper.notNull(key, RESOURCE_KEY); - - if (consistency != null) { - map.containsKey(key, consistency).thenAccept( - result -> processResult(message, callback, result) - ); - } else { - map.containsKey(key).thenAccept( - result -> processResult(message, callback, result) - ); - } - - return false; - } - - @AsyncInvokeOnHeader(AtomixClientAction.CONTAINS_VALUE) - boolean onContainsValue(Message message, AsyncCallback callback) throws Exception { - final DistributedMap<Object, Object> map = getMap(message); - final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); - final Object value = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); - - ObjectHelper.notNull(value, RESOURCE_VALUE); - - if (consistency != null) { - map.containsValue(value, consistency).thenAccept( - result -> processResult(message, callback, result) - ); - } else { - map.containsValue(value).thenAccept( - result -> processResult(message, callback, result) - ); - } - - return false; - } - - @AsyncInvokeOnHeader(AtomixClientAction.REMOVE) - boolean onRemove(Message message, AsyncCallback callback) throws Exception { - final DistributedMap<Object, Object> map = getMap(message); - final Object key = message.getHeader(RESOURCE_KEY, message::getBody, Object.class); - final Object value = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); - - ObjectHelper.notNull(key, RESOURCE_VALUE); - - if (value != null) { - map.remove(key, value).thenAccept( - result -> processResult(message, callback, result) - ); - } else { - map.remove(key).thenAccept( - result -> processResult(message, callback, result) - ); - } - - return false; - } - - @AsyncInvokeOnHeader(AtomixClientAction.REPLACE) - boolean onReplace(Message message, AsyncCallback callback) throws Exception { - final DistributedMap<Object, Object> map = getMap(message); - final Duration ttl = message.getHeader(RESOURCE_TTL, configuration::getTtl, Duration.class); - final Object key = message.getHeader(RESOURCE_KEY, message::getBody, Object.class); - final Object newValue = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); - final Object oldValue = message.getHeader(RESOURCE_OLD_VALUE, Object.class); - - ObjectHelper.notNull(key, RESOURCE_VALUE); - ObjectHelper.notNull(newValue, RESOURCE_VALUE); - - if (ttl != null) { - if (oldValue != null) { - map.replace(key, oldValue, newValue, ttl).thenAccept( - result -> processResult(message, callback, result) - ); - } else { - map.replace(key, newValue, ttl).thenAccept( - result -> processResult(message, callback, result) - ); - } - } else { - if (oldValue != null) { - map.replace(key, oldValue, newValue).thenAccept( - result -> processResult(message, callback, result) - ); - } else { - map.replace(key, newValue).thenAccept( - result -> processResult(message, callback, result) - ); - } - } - - return false; - } - - // ********************************* - // Helpers - // ********************************* - - private void processResult(Message message, AsyncCallback callback, Object result) { - if (result != null && !(result instanceof Void)) { - message.setHeader(RESOURCE_ACTION_HAS_RESULT, true); - - String resultHeader = configuration.getResultHeader(); - if (resultHeader != null) { - message.setHeader(resultHeader, result); - } else { - message.setBody(result); - } - } else { - message.setHeader(RESOURCE_ACTION_HAS_RESULT, false); - } - - callback.done(false); - } - - private DistributedMap<Object, Object> getMap(Message message) { - return maps.computeIfAbsent( - message.getHeader(RESOURCE_NAME, getAtomixEndpoint().getMapName(), String.class), - name -> { - return getAtomixEndpoint() - .getAtomix() - .getMap(name) - //getAtomixEndpoint().getAtomixConfiguration().getConfig(), - //getAtomixEndpoint().getAtomixConfiguration().getOptions()) - .join(); - } - ); - } -}
http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMap.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMap.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMap.java new file mode 100644 index 0000000..9a17ff8 --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMap.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.map; + +public final class AtomixMap { + + public enum Action { + PUT, + PUT_IF_ABSENT, + GET, + CLEAR, + SIZE, + CONTAINS_KEY, + CONTAINS_VALUE, + IS_EMPTY, + ENTRY_SET, + REMOVE, + REPLACE, + VALUES + } + + private AtomixMap() { + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapComponent.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapComponent.java new file mode 100644 index 0000000..4086149 --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapComponent.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.map; + +import java.util.Map; + +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.component.atomix.client.AbstractAtomixClientComponent; + +public final class AtomixMapComponent extends AbstractAtomixClientComponent<AtomixMapConfiguration> { + private AtomixMapConfiguration configuration = new AtomixMapConfiguration(); + + public AtomixMapComponent() { + super(); + } + + public AtomixMapComponent(CamelContext camelContext) { + super(camelContext); + } + + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + AtomixMapConfiguration configuration = this.configuration.copy(); + + // Bind options to the configuration object + setConfigurationProperties(configuration, parameters); + + AtomixMapEndpoint endpoint = new AtomixMapEndpoint(uri, this, remaining); + endpoint.setConfiguration(configuration); + + setProperties(endpoint, parameters); + + return endpoint; + } + + // ********************************************** + // Properties + // ********************************************** + + public AtomixMapConfiguration getConfiguration() { + return this.configuration; + } + + /** + * The shared component configuration + */ + public void setConfiguration(AtomixMapConfiguration configuration) { + this.configuration = configuration; + } + + @Override + protected AtomixMapConfiguration getComponentConfiguration() { + return getConfiguration(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapConfiguration.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapConfiguration.java new file mode 100644 index 0000000..6b9f87a --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapConfiguration.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.map; + +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.component.atomix.client.AtomixClientConfiguration; +import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriParams; + +@UriParams +public final class AtomixMapConfiguration extends AtomixClientConfiguration { + @UriParam(defaultValue = "PUT") + private AtomixMap.Action defaultAction = AtomixMap.Action.PUT; + @UriParam + private Object key; + @UriParam + private long ttl; + + // **************************************** + // Properties + // **************************************** + + public AtomixMap.Action getDefaultAction() { + return defaultAction; + } + + /** + * The default action. + */ + public void setDefaultAction(AtomixMap.Action defaultAction) { + this.defaultAction = defaultAction; + } + + public Object getKey() { + return key; + } + + /** + * The key to use if none is set in the header or to listen for events for + * a specific key. + */ + public void setKey(Object defaultKey) { + this.key = defaultKey; + } + + public long getTtl() { + return ttl; + } + + /** + * The resource ttl. + */ + public void setTtl(long ttl) { + this.ttl = ttl; + } + + // **************************************** + // Copy + // **************************************** + + public AtomixMapConfiguration copy() { + try { + return (AtomixMapConfiguration) super.clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeCamelException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapConsumer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapConsumer.java new file mode 100644 index 0000000..73269ee --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapConsumer.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.map; + +import java.util.ArrayList; +import java.util.List; + +import io.atomix.catalyst.concurrent.Listener; +import io.atomix.collections.DistributedMap; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.atomix.client.AbstractAtomixClientConsumer; +import org.apache.camel.component.atomix.client.AtomixClientConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class AtomixMapConsumer extends AbstractAtomixClientConsumer<AtomixMapEndpoint> { + private static final Logger LOGGER = LoggerFactory.getLogger(AtomixMapConsumer.class); + + private final List<Listener<DistributedMap.EntryEvent<Object, Object>>> listeners; + private final String resourceName; + private final String resultHeader; + private DistributedMap<Object, Object> map; + + public AtomixMapConsumer(AtomixMapEndpoint endpoint, Processor processor, String resourceName) { + super(endpoint, processor); + this.listeners = new ArrayList<>(); + this.resourceName = resourceName; + this.resultHeader = endpoint.getConfiguration().getResultHeader(); + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + + this.map = getAtomixEndpoint() + .getAtomix() + .getMap( + resourceName, + new DistributedMap.Config(getAtomixEndpoint().getConfiguration().getResourceOptions(resourceName)), + new DistributedMap.Options(getAtomixEndpoint().getConfiguration().getResourceConfig(resourceName))) + .join(); + + + Object key = getAtomixEndpoint().getConfiguration().getKey(); + if (key == null) { + LOGGER.debug("Subscribe to events for map: {}", resourceName); + this.listeners.add(this.map.onAdd(this::onEvent).join()); + this.listeners.add(this.map.onRemove(this::onEvent).join()); + this.listeners.add(this.map.onUpdate(this::onEvent).join()); + } else { + LOGGER.debug("Subscribe to events for map: {}, key: {}", resourceName, key); + this.listeners.add(this.map.onAdd(key, this::onEvent).join()); + this.listeners.add(this.map.onRemove(key, this::onEvent).join()); + this.listeners.add(this.map.onUpdate(key, this::onEvent).join()); + } + } + + @Override + protected void doStop() throws Exception { + // close listeners + listeners.forEach(Listener::close); + + super.doStart(); + } + + // ******************************************** + // Event handler + // ******************************************** + + private void onEvent(DistributedMap.EntryEvent<Object, Object> event) { + Exchange exchange = getEndpoint().createExchange(); + exchange.getIn().setHeader(AtomixClientConstants.EVENT_TYPE, event.type()); + exchange.getIn().setHeader(AtomixClientConstants.RESOURCE_KEY, event.entry().getKey()); + + if (resultHeader == null) { + exchange.getIn().setBody(event.entry().getValue()); + } else { + exchange.getIn().setHeader(resultHeader, event.entry().getValue()); + } + + try { + getProcessor().process(exchange); + } catch (Exception e) { + getExceptionHandler().handleException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapEndpoint.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapEndpoint.java new file mode 100644 index 0000000..cf9bb63 --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapEndpoint.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.map; + + +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.component.atomix.client.AbstractAtomixClientEndpoint; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; + +@UriEndpoint( + firstVersion = "2.20.0", + scheme = "atomix-map", + title = "Atomix Map", + syntax = "atomix-map:mapName", + consumerClass = AtomixMapConsumer.class, + label = "clustering") +class AtomixMapEndpoint extends AbstractAtomixClientEndpoint<AtomixMapComponent, AtomixMapConfiguration> { + @UriParam + private AtomixMapConfiguration configuration; + + public AtomixMapEndpoint(String uri, AtomixMapComponent component, String resourceName) { + super(uri, component, resourceName); + } + + @Override + public Producer createProducer() throws Exception { + return new AtomixMapProducer(this); + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + return new AtomixMapConsumer(this, processor, getResourceName()); + } + + @Override + public AtomixMapConfiguration getConfiguration() { + return this.configuration; + } + + @Override + public void setConfiguration(AtomixMapConfiguration configuration) { + this.configuration = configuration; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapProducer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapProducer.java new file mode 100644 index 0000000..8bdbf8e --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapProducer.java @@ -0,0 +1,336 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.map; + +import java.time.Duration; + +import io.atomix.collections.DistributedMap; +import io.atomix.resource.ReadConsistency; +import org.apache.camel.AsyncCallback; +import org.apache.camel.InvokeOnHeader; +import org.apache.camel.Message; +import org.apache.camel.component.atomix.client.AbstractAtomixClientProducer; +import org.apache.camel.util.ObjectHelper; + +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_ACTION; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_DEFAULT_VALUE; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_KEY; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_NAME; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_OLD_VALUE; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_READ_CONSISTENCY; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_TTL; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_VALUE; + +final class AtomixMapProducer extends AbstractAtomixClientProducer<AtomixMapEndpoint, DistributedMap<Object, Object>> { + private final AtomixMapConfiguration configuration; + + protected AtomixMapProducer(AtomixMapEndpoint endpoint) { + super(endpoint); + this.configuration = endpoint.getConfiguration(); + } + + // ********************************* + // Handlers + // ********************************* + + @InvokeOnHeader("PUT") + boolean onPut(Message message, AsyncCallback callback) throws Exception { + final DistributedMap<Object, Object> map = getResource(message); + final Object key = message.getHeader(RESOURCE_KEY, configuration::getKey, Object.class); + final Object val = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); + final long ttl = message.getHeader(RESOURCE_TTL, configuration::getTtl, long.class); + + ObjectHelper.notNull(key, RESOURCE_KEY); + ObjectHelper.notNull(val, RESOURCE_VALUE); + + if (ttl > 0) { + map.put(key, val, Duration.ofMillis(ttl)).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + map.put(key, val).thenAccept( + result -> processResult(message, callback, result) + ); + } + + return false; + } + + @InvokeOnHeader("PUT_IF_ABSENT") + boolean onPutIfAbsent(Message message, AsyncCallback callback) throws Exception { + final DistributedMap<Object, Object> map = getResource(message); + final Object key = message.getHeader(RESOURCE_KEY, configuration::getKey, Object.class); + final Object val = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); + final long ttl = message.getHeader(RESOURCE_TTL, configuration::getTtl, long.class); + + ObjectHelper.notNull(key, RESOURCE_KEY); + ObjectHelper.notNull(val, RESOURCE_VALUE); + + if (ttl > 0) { + map.putIfAbsent(key, val, Duration.ofMillis(ttl)).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + map.putIfAbsent(key, val).thenAccept( + result -> processResult(message, callback, result) + ); + } + + return false; + } + + @InvokeOnHeader("GET") + boolean onGet(Message message, AsyncCallback callback) throws Exception { + final DistributedMap<Object, Object> map = getResource(message); + final Object key = message.getHeader(RESOURCE_KEY, configuration::getKey, Object.class); + final Object defaultValue = message.getHeader(RESOURCE_DEFAULT_VALUE); + final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); + + ObjectHelper.notNull(key, RESOURCE_KEY); + + if (consistency != null) { + if (defaultValue != null) { + map.getOrDefault(key, defaultValue, consistency).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + map.get(key, consistency).thenAccept( + result -> processResult(message, callback, result) + ); + } + } else { + if (defaultValue != null) { + map.getOrDefault(key, defaultValue).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + map.get(key).thenAccept( + result -> processResult(message, callback, result) + ); + } + } + + return false; + } + + @InvokeOnHeader("CLEAR") + boolean onClear(Message message, AsyncCallback callback) throws Exception { + final DistributedMap<Object, Object> map = getResource(message); + + map.clear().thenAccept( + result -> processResult(message, callback, result) + ); + + return false; + } + + @InvokeOnHeader("SIZE") + boolean onSize(Message message, AsyncCallback callback) throws Exception { + final DistributedMap<Object, Object> map = getResource(message); + final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); + + if (consistency != null) { + map.size(consistency).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + map.size().thenAccept( + result -> processResult(message, callback, result) + ); + } + + return false; + } + + @InvokeOnHeader("IS_EMPTY") + boolean onIsEmpty(Message message, AsyncCallback callback) throws Exception { + final DistributedMap<Object, Object> map = getResource(message); + final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); + + if (consistency != null) { + map.isEmpty(consistency).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + map.isEmpty().thenAccept( + result -> processResult(message, callback, result) + ); + } + + return false; + } + + @InvokeOnHeader("ENTRY_SET") + boolean onEntrySet(Message message, AsyncCallback callback) throws Exception { + final DistributedMap<Object, Object> map = getResource(message); + final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); + + if (consistency != null) { + map.entrySet(consistency).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + map.entrySet().thenAccept( + result -> processResult(message, callback, result) + ); + } + + return false; + } + + @InvokeOnHeader("VALUES") + boolean onValues(Message message, AsyncCallback callback) throws Exception { + final DistributedMap<Object, Object> map = getResource(message); + final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); + + if (consistency != null) { + map.values(consistency).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + map.values().thenAccept( + result -> processResult(message, callback, result) + ); + } + + return false; + } + + @InvokeOnHeader("CONTAINS_KEY") + boolean onContainsKey(Message message, AsyncCallback callback) throws Exception { + final DistributedMap<Object, Object> map = getResource(message); + final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); + final Object key = message.getHeader(RESOURCE_KEY, message::getBody, Object.class); + + ObjectHelper.notNull(key, RESOURCE_KEY); + + if (consistency != null) { + map.containsKey(key, consistency).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + map.containsKey(key).thenAccept( + result -> processResult(message, callback, result) + ); + } + + return false; + } + + @InvokeOnHeader("CONTAINS_VALUE") + boolean onContainsValue(Message message, AsyncCallback callback) throws Exception { + final DistributedMap<Object, Object> map = getResource(message); + final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); + final Object value = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); + + ObjectHelper.notNull(value, RESOURCE_VALUE); + + if (consistency != null) { + map.containsValue(value, consistency).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + map.containsValue(value).thenAccept( + result -> processResult(message, callback, result) + ); + } + + return false; + } + + @InvokeOnHeader("REMOVE") + boolean onRemove(Message message, AsyncCallback callback) throws Exception { + final DistributedMap<Object, Object> map = getResource(message); + final Object key = message.getHeader(RESOURCE_KEY, message::getBody, Object.class); + final Object value = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); + + ObjectHelper.notNull(key, RESOURCE_KEY); + + if (value != null) { + map.remove(key, value).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + map.remove(key).thenAccept( + result -> processResult(message, callback, result) + ); + } + + return false; + } + + @InvokeOnHeader("REPLACE") + boolean onReplace(Message message, AsyncCallback callback) throws Exception { + final DistributedMap<Object, Object> map = getResource(message); + final long ttl = message.getHeader(RESOURCE_TTL, configuration::getTtl, long.class); + final Object key = message.getHeader(RESOURCE_KEY, configuration::getKey, Object.class); + final Object newValue = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); + final Object oldValue = message.getHeader(RESOURCE_OLD_VALUE, Object.class); + + ObjectHelper.notNull(key, RESOURCE_KEY); + ObjectHelper.notNull(newValue, RESOURCE_VALUE); + + if (ttl > 0) { + if (oldValue != null) { + map.replace(key, oldValue, newValue, Duration.ofMillis(ttl)).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + map.replace(key, newValue, Duration.ofMillis(ttl)).thenAccept( + result -> processResult(message, callback, result) + ); + } + } else { + if (oldValue != null) { + map.replace(key, oldValue, newValue).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + map.replace(key, newValue).thenAccept( + result -> processResult(message, callback, result) + ); + } + } + + return false; + } + + // ********************************* + // Implementation + // ********************************* + + @Override + protected String getProcessorKey(Message message) { + return message.getHeader(RESOURCE_ACTION, configuration::getDefaultAction, String.class); + } + + @Override + protected String getResourceName(Message message) { + return message.getHeader(RESOURCE_NAME, getAtomixEndpoint()::getResourceName, String.class); + } + + @Override + protected DistributedMap<Object, Object> createResource(String resourceName) { + return getAtomixEndpoint() + .getAtomix() + .getMap( + resourceName, + new DistributedMap.Config(getAtomixEndpoint().getConfiguration().getResourceOptions(resourceName)), + new DistributedMap.Options(getAtomixEndpoint().getConfiguration().getResourceConfig(resourceName))) + .join(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessaging.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessaging.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessaging.java new file mode 100644 index 0000000..a1f0d94 --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessaging.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.messaging; + +import io.atomix.group.messaging.MessageProducer; + +public final class AtomixMessaging { + + // **************************************** + // Constants + // **************************************** + + static final MessageProducer.Options OPTIONS_DIRECT = new MessageProducer.Options() + .withExecution(MessageProducer.Execution.SYNC) + .withDelivery(MessageProducer.Delivery.DIRECT); + static final MessageProducer.Options OPTIONS_BROADCAST = new MessageProducer.Options() + .withExecution(MessageProducer.Execution.ASYNC) + .withDelivery(MessageProducer.Delivery.BROADCAST); + static final MessageProducer.Options OPTIONS_BROADCAST_RANDOM = new MessageProducer.Options() + .withExecution(MessageProducer.Execution.ASYNC) + .withDelivery(MessageProducer.Delivery.RANDOM); + + // **************************************** + // Emums + // **************************************** + + public enum BroadcastType { + ALL, + RANDOM + } + + public enum Action { + DIRECT, + BROADCAST + //TODO: REQUEST_REPLY + } + + // **************************************** + // + // **************************************** + + private AtomixMessaging() { + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingComponent.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingComponent.java new file mode 100644 index 0000000..c8bc756 --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingComponent.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.messaging; + +import java.util.Map; + +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.component.atomix.client.AbstractAtomixClientComponent; + +public final class AtomixMessagingComponent extends AbstractAtomixClientComponent<AtomixMessagingConfiguration> { + private AtomixMessagingConfiguration configuration = new AtomixMessagingConfiguration(); + + public AtomixMessagingComponent() { + super(); + } + + public AtomixMessagingComponent(CamelContext camelContext) { + super(camelContext); + } + + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + AtomixMessagingConfiguration configuration = this.configuration.copy(); + + // Bind options to the configuration object + setConfigurationProperties(configuration, parameters); + + AtomixMessagingEndpoint endpoint = new AtomixMessagingEndpoint(uri, this, remaining); + endpoint.setConfiguration(configuration); + + setProperties(endpoint, parameters); + + return endpoint; + } + + // ********************************************** + // Properties + // ********************************************** + + public AtomixMessagingConfiguration getConfiguration() { + return this.configuration; + } + + /** + * The shared component configuration + */ + public void setConfiguration(AtomixMessagingConfiguration configuration) { + this.configuration = configuration; + } + + @Override + protected AtomixMessagingConfiguration getComponentConfiguration() { + return getConfiguration(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingConfiguration.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingConfiguration.java new file mode 100644 index 0000000..7e92d57 --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingConfiguration.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.messaging; + +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.component.atomix.client.AtomixClientConfiguration; +import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriParams; + +@UriParams +public class AtomixMessagingConfiguration extends AtomixClientConfiguration { + @UriParam(defaultValue = "DIRECT") + private AtomixMessaging.Action defaultAction = AtomixMessaging.Action.DIRECT; + @UriParam + private String memberName; + @UriParam + private String channelName; + @UriParam(defaultValue = "ALL") + private AtomixMessaging.BroadcastType broadcastType = AtomixMessaging.BroadcastType.ALL; + + // **************************************** + // Properties + // **************************************** + + public AtomixMessaging.Action getDefaultAction() { + return defaultAction; + } + + /** + * The default action. + */ + public void setDefaultAction(AtomixMessaging.Action defaultAction) { + this.defaultAction = defaultAction; + } + + public String getMemberName() { + return memberName; + } + + /** + * The Atomix Group member name + */ + public void setMemberName(String memberName) { + this.memberName = memberName; + } + + public String getChannelName() { + return channelName; + } + + /** + * The messaging channel name + */ + public void setChannelName(String channelName) { + this.channelName = channelName; + } + + public AtomixMessaging.BroadcastType getBroadcastType() { + return broadcastType; + } + + /** + * The broadcast type. + */ + public void setBroadcastType(AtomixMessaging.BroadcastType broadcastType) { + this.broadcastType = broadcastType; + } + + // **************************************** + // Copy + // **************************************** + + public AtomixMessagingConfiguration copy() { + try { + return (AtomixMessagingConfiguration) super.clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeCamelException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingConsumer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingConsumer.java new file mode 100644 index 0000000..547a66c --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingConsumer.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.messaging; + +import java.util.ArrayList; +import java.util.List; + +import io.atomix.catalyst.concurrent.Listener; +import io.atomix.group.DistributedGroup; +import io.atomix.group.LocalMember; +import io.atomix.group.messaging.Message; +import io.atomix.group.messaging.MessageConsumer; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.atomix.client.AbstractAtomixClientConsumer; +import org.apache.camel.component.atomix.client.AtomixClientConstants; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.camel.component.atomix.client.AtomixClientConstants.CHANNEL_NAME; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.MEMBER_NAME; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_NAME; + +final class AtomixMessagingConsumer extends AbstractAtomixClientConsumer<AtomixMessagingEndpoint> { + private static final Logger LOGGER = LoggerFactory.getLogger(AtomixMessagingConsumer.class); + + private final List<Listener<Message<Object>>> listeners; + private final String resultHeader; + private final String groupName; + private final String memberName; + private final String channelName; + + private LocalMember localMember; + private MessageConsumer<Object> consumer; + + public AtomixMessagingConsumer(AtomixMessagingEndpoint endpoint, Processor processor) { + super(endpoint, processor); + this.listeners = new ArrayList<>(); + this.resultHeader = endpoint.getConfiguration().getResultHeader(); + this.groupName = endpoint.getResourceName(); + this.memberName = endpoint.getConfiguration().getMemberName(); + this.channelName = endpoint.getConfiguration().getChannelName(); + + ObjectHelper.notNull(groupName, RESOURCE_NAME); + ObjectHelper.notNull(memberName, MEMBER_NAME); + ObjectHelper.notNull(channelName, CHANNEL_NAME); + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + + DistributedGroup group = getAtomixEndpoint().getAtomix().getGroup( + groupName, + new DistributedGroup.Config(getAtomixEndpoint().getConfiguration().getResourceOptions(groupName)), + new DistributedGroup.Options(getAtomixEndpoint().getConfiguration().getResourceConfig(groupName)) + ).join(); + + this.localMember = group.join(memberName).join(); + this.consumer = localMember.messaging().consumer(channelName); + + LOGGER.debug("Subscribe to group: {}, member: {}, channel: {}", groupName, memberName, channelName); + this.listeners.add(consumer.onMessage(this::onMessage)); + } + + @Override + protected void doStop() throws Exception { + // close listeners + listeners.forEach(Listener::close); + + if (this.consumer != null) { + this.consumer.close(); + this.consumer = null; + } + + //if (this.localMember != null) { + // this.localMember.leave().join(); + // this.localMember = null; + //} + + super.doStop(); + } + + // ******************************************** + // Event handler + // ******************************************** + + private void onMessage(Message<Object> message) { + Exchange exchange = getEndpoint().createExchange(); + exchange.getIn().setHeader(AtomixClientConstants.MESSAGE_ID, message.id()); + + if (resultHeader == null) { + exchange.getIn().setBody(message.message()); + } else { + exchange.getIn().setHeader(resultHeader, message.message()); + } + + try { + getProcessor().process(exchange); + message.ack(); + } catch (Exception e) { + message.fail(); + getExceptionHandler().handleException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingEndpoint.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingEndpoint.java new file mode 100644 index 0000000..558008e --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingEndpoint.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.messaging; + + +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.component.atomix.client.AbstractAtomixClientEndpoint; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; + +@UriEndpoint( + firstVersion = "2.20.0", + scheme = "atomix-messaging", + title = "Atomix Messaging", + syntax = "atomix-messaging:group", + consumerClass = AtomixMessagingConsumer.class, + label = "clustering") +final class AtomixMessagingEndpoint extends AbstractAtomixClientEndpoint<AtomixMessagingComponent, AtomixMessagingConfiguration> { + @UriParam + private AtomixMessagingConfiguration configuration; + + AtomixMessagingEndpoint(String uri, AtomixMessagingComponent component, String resourceName) { + super(uri, component, resourceName); + } + + @Override + public Producer createProducer() throws Exception { + return new AtomixMessagingProducer(this); + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + return new AtomixMessagingConsumer(this, processor); + } + + @Override + public AtomixMessagingConfiguration getConfiguration() { + return this.configuration; + } + + @Override + public void setConfiguration(AtomixMessagingConfiguration configuration) { + this.configuration = configuration; + } +} + http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingProducer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingProducer.java new file mode 100644 index 0000000..2d50dcb --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingProducer.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.messaging; + +import io.atomix.group.DistributedGroup; +import io.atomix.group.GroupMember; +import io.atomix.group.messaging.MessageProducer; +import org.apache.camel.AsyncCallback; +import org.apache.camel.InvokeOnHeader; +import org.apache.camel.Message; +import org.apache.camel.component.atomix.client.AbstractAtomixClientProducer; +import org.apache.camel.util.ObjectHelper; + +import static org.apache.camel.component.atomix.client.AtomixClientConstants.BROADCAST_TYPE; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.CHANNEL_NAME; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.MEMBER_NAME; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_ACTION; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_NAME; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_VALUE; +import static org.apache.camel.component.atomix.client.messaging.AtomixMessaging.OPTIONS_BROADCAST; +import static org.apache.camel.component.atomix.client.messaging.AtomixMessaging.OPTIONS_BROADCAST_RANDOM; +import static org.apache.camel.component.atomix.client.messaging.AtomixMessaging.OPTIONS_DIRECT; + +final class AtomixMessagingProducer extends AbstractAtomixClientProducer<AtomixMessagingEndpoint, DistributedGroup> { + private final AtomixMessagingConfiguration configuration; + + protected AtomixMessagingProducer(AtomixMessagingEndpoint endpoint) { + super(endpoint); + this.configuration = endpoint.getConfiguration(); + } + + // ********************************* + // Handlers + // ********************************* + + @InvokeOnHeader("DIRECT") + boolean onDirect(Message message, AsyncCallback callback) throws Exception { + final Object value = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); + final String memberName = message.getHeader(MEMBER_NAME, configuration::getMemberName, String.class); + final String channelName = message.getHeader(CHANNEL_NAME, configuration::getChannelName, String.class); + + ObjectHelper.notNull(memberName, MEMBER_NAME); + ObjectHelper.notNull(channelName, CHANNEL_NAME); + ObjectHelper.notNull(value, RESOURCE_VALUE); + + final DistributedGroup group = getResource(message); + final GroupMember member = group.member(memberName); + final MessageProducer<Object> producer = member.messaging().producer(channelName, OPTIONS_DIRECT); + + producer.send(value).thenAccept( + result -> processResult(message, callback, result) + ); + + return false; + } + + @InvokeOnHeader("BROADCAST") + boolean onBroadcast(Message message, AsyncCallback callback) throws Exception { + final Object value = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); + final String channelName = message.getHeader(CHANNEL_NAME, configuration::getChannelName, String.class); + final AtomixMessaging.BroadcastType type = message.getHeader(BROADCAST_TYPE, configuration::getBroadcastType, AtomixMessaging.BroadcastType.class); + + ObjectHelper.notNull(channelName, CHANNEL_NAME); + ObjectHelper.notNull(value, RESOURCE_VALUE); + + MessageProducer.Options options = type == AtomixMessaging.BroadcastType.RANDOM + ? OPTIONS_BROADCAST_RANDOM + : OPTIONS_BROADCAST; + + final DistributedGroup group = getResource(message); + final MessageProducer<Object> producer = group.messaging().producer(channelName, options); + + producer.send(value).thenRun( + () -> processResult(message, callback, null) + ); + + return false; + } + + // ********************************* + // Implementation + // ********************************* + + @Override + protected String getProcessorKey(Message message) { + return message.getHeader(RESOURCE_ACTION, configuration::getDefaultAction, String.class); + } + + @Override + protected String getResourceName(Message message) { + return message.getHeader(RESOURCE_NAME, getAtomixEndpoint()::getResourceName, String.class); + } + + @Override + protected DistributedGroup createResource(String resourceName) { + return getAtomixEndpoint().getAtomix() + .getGroup( + resourceName, + new DistributedGroup.Config(getAtomixEndpoint().getConfiguration().getResourceOptions(resourceName)), + new DistributedGroup.Options(getAtomixEndpoint().getConfiguration().getResourceConfig(resourceName)) + ).join(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMap.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMap.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMap.java new file mode 100644 index 0000000..5a37d0e --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMap.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.multimap; + +public final class AtomixMultiMap { + + public enum Action { + PUT, + GET, + CLEAR, + SIZE, + CONTAINS_KEY, + //CONTAINS_VALUE, + //CONTAINS_ENTRY, + IS_EMPTY, + REMOVE, + REMOVE_VALUE, + } + + private AtomixMultiMap() { + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMapComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMapComponent.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMapComponent.java new file mode 100644 index 0000000..409027e --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMapComponent.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.multimap; + +import java.util.Map; + +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.component.atomix.client.AbstractAtomixClientComponent; + +public final class AtomixMultiMapComponent extends AbstractAtomixClientComponent<AtomixMultiMapConfiguration> { + private AtomixMultiMapConfiguration configuration = new AtomixMultiMapConfiguration(); + + public AtomixMultiMapComponent() { + super(); + } + + public AtomixMultiMapComponent(CamelContext camelContext) { + super(camelContext); + } + + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + AtomixMultiMapConfiguration configuration = this.configuration.copy(); + + // Bind options to the configuration object + setConfigurationProperties(configuration, parameters); + + AtomixMultiMapEndpoint endpoint = new AtomixMultiMapEndpoint(uri, this, remaining); + endpoint.setConfiguration(configuration); + + setProperties(endpoint, parameters); + + return endpoint; + } + + // ********************************************** + // Properties + // ********************************************** + + public AtomixMultiMapConfiguration getConfiguration() { + return this.configuration; + } + + /** + * The shared component configuration + */ + public void setConfiguration(AtomixMultiMapConfiguration configuration) { + this.configuration = configuration; + } + + @Override + protected AtomixMultiMapConfiguration getComponentConfiguration() { + return getConfiguration(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMapConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMapConfiguration.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMapConfiguration.java new file mode 100644 index 0000000..400eb93 --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMapConfiguration.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.multimap; + +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.component.atomix.client.AtomixClientConfiguration; +import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriParams; + +@UriParams +public class AtomixMultiMapConfiguration extends AtomixClientConfiguration { + @UriParam(defaultValue = "PUT") + private AtomixMultiMap.Action defaultAction = AtomixMultiMap.Action.PUT; + @UriParam + private Object key; + @UriParam + private long ttl; + + // **************************************** + // Properties + // **************************************** + + public AtomixMultiMap.Action getDefaultAction() { + return defaultAction; + } + + /** + * The default action. + */ + public void setDefaultAction(AtomixMultiMap.Action defaultAction) { + this.defaultAction = defaultAction; + } + + public Object getKey() { + return key; + } + + /** + * The key to use if none is set in the header or to listen for events for + * a specific key. + */ + public void setKey(Object defaultKey) { + this.key = defaultKey; + } + + public long getTtl() { + return ttl; + } + + /** + * The resource ttl. + */ + public void setTtl(long ttl) { + this.ttl = ttl; + } + + // **************************************** + // Copy + // **************************************** + + public AtomixMultiMapConfiguration copy() { + try { + return (AtomixMultiMapConfiguration) super.clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeCamelException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/3756fba2/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMapEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMapEndpoint.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMapEndpoint.java new file mode 100644 index 0000000..6687c63 --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMapEndpoint.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.multimap; + + +import org.apache.camel.Producer; +import org.apache.camel.component.atomix.client.AbstractAtomixClientEndpoint; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; + +@UriEndpoint( + firstVersion = "2.20.0", + scheme = "atomix-multimap", + title = "Atomix MultiMap", + syntax = "atomix-multimap:multiMapName", + consumerOnly = true, + label = "clustering") +final class AtomixMultiMapEndpoint extends AbstractAtomixClientEndpoint<AtomixMultiMapComponent, AtomixMultiMapConfiguration> { + @UriParam + private AtomixMultiMapConfiguration configuration; + + public AtomixMultiMapEndpoint(String uri, AtomixMultiMapComponent component, String resourceName) { + super(uri, component, resourceName); + } + + @Override + public Producer createProducer() throws Exception { + return new AtomixMultiMapProducer(this); + } + + @Override + public AtomixMultiMapConfiguration getConfiguration() { + return this.configuration; + } + + @Override + public void setConfiguration(AtomixMultiMapConfiguration configuration) { + this.configuration = configuration; + } +}