http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/idgen/IgniteIdGenEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/idgen/IgniteIdGenEndpoint.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/idgen/IgniteIdGenEndpoint.java new file mode 100644 index 0000000..3e1da3c --- /dev/null +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/idgen/IgniteIdGenEndpoint.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ignite.idgen; + +import java.net.URI; +import java.util.Map; + +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.component.ignite.AbstractIgniteEndpoint; +import org.apache.camel.component.ignite.IgniteComponent; +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.apache.camel.util.ObjectHelper; +import org.apache.ignite.IgniteAtomicSequence; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Ignite ID Generator endpoint. + */ +@UriEndpoint(scheme = "ignite:idgen", title = "Ignite ID Generator", syntax = "ignite:idgen:[name]", label = "nosql,cache,compute", producerOnly = true) +public class IgniteIdGenEndpoint extends AbstractIgniteEndpoint { + + private static final Logger LOG = LoggerFactory.getLogger(IgniteIdGenEndpoint.class); + + @UriParam + @Metadata(required = "true") + private String name; + + @UriParam + private Integer batchSize; + + @UriParam(defaultValue = "0") + private Long initialValue = 0L; + + @UriParam + private IgniteIdGenOperation operation; + + public IgniteIdGenEndpoint(String endpointUri, URI remainingUri, Map<String, Object> parameters, IgniteComponent igniteComponent) throws Exception { + super(endpointUri, igniteComponent); + name = remainingUri.getHost(); + + ObjectHelper.notNull(name, "ID Generator name"); + } + + @Override + public Producer createProducer() throws Exception { + IgniteAtomicSequence atomicSeq = ignite().atomicSequence(name, initialValue, false); + + if (atomicSeq == null) { + atomicSeq = ignite().atomicSequence(name, initialValue, true); + LOG.info("Created AtomicSequence of ID Generator with name {}.", name); + } + + if (batchSize != null) { + atomicSeq.batchSize(batchSize); + } + + return new IgniteIdGenProducer(this, atomicSeq); + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + throw new UnsupportedOperationException("The Ignite Id Generator endpoint doesn't support consumers."); + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Long getInitialValue() { + return initialValue; + } + + public void setInitialValue(Long initialValue) { + this.initialValue = initialValue; + } + + public IgniteIdGenOperation getOperation() { + return operation; + } + + public void setOperation(IgniteIdGenOperation operation) { + this.operation = operation; + } + + public Integer getBatchSize() { + return batchSize; + } + + public void setBatchSize(Integer batchSize) { + this.batchSize = batchSize; + } + +}
http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/idgen/IgniteIdGenOperation.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/idgen/IgniteIdGenOperation.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/idgen/IgniteIdGenOperation.java new file mode 100644 index 0000000..d5d1db0 --- /dev/null +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/idgen/IgniteIdGenOperation.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ignite.idgen; + +/** + * Enumeration of ID Generator / AtomicSequence operations. + */ +public enum IgniteIdGenOperation { + + ADD_AND_GET, GET, GET_AND_ADD, GET_AND_INCREMENT, INCREMENT_AND_GET + +} http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/idgen/IgniteIdGenProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/idgen/IgniteIdGenProducer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/idgen/IgniteIdGenProducer.java new file mode 100644 index 0000000..8ca75ae --- /dev/null +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/idgen/IgniteIdGenProducer.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ignite.idgen; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.component.ignite.IgniteConstants; +import org.apache.camel.impl.DefaultAsyncProducer; +import org.apache.camel.util.MessageHelper; +import org.apache.ignite.IgniteAtomicSequence; + +/** + * Ignite ID Generator producer. + */ +public class IgniteIdGenProducer extends DefaultAsyncProducer { + + private IgniteIdGenEndpoint endpoint; + private IgniteAtomicSequence atomicSeq; + + public IgniteIdGenProducer(IgniteIdGenEndpoint endpoint, IgniteAtomicSequence atomicSeq) { + super(endpoint); + this.endpoint = endpoint; + this.atomicSeq = atomicSeq; + } + + @Override + public boolean process(Exchange exchange, AsyncCallback callback) { + Message in = exchange.getIn(); + Message out = exchange.getOut(); + MessageHelper.copyHeaders(in, out, true); + + Long id = in.getBody(Long.class); + + switch (idGenOperationFor(exchange)) { + + case ADD_AND_GET: + out.setBody(atomicSeq.addAndGet(id)); + break; + + case GET: + out.setBody(atomicSeq.get()); + break; + + case GET_AND_ADD: + out.setBody(atomicSeq.getAndAdd(id)); + break; + + case GET_AND_INCREMENT: + out.setBody(atomicSeq.getAndIncrement()); + break; + + case INCREMENT_AND_GET: + out.setBody(atomicSeq.incrementAndGet()); + break; + + default: + exchange.setException(new UnsupportedOperationException("Operation not supported by Ignite ID Generator producer.")); + return true; + } + + return true; + } + + private IgniteIdGenOperation idGenOperationFor(Exchange exchange) { + return exchange.getIn().getHeader(IgniteConstants.IGNITE_IDGEN_OPERATION, endpoint.getOperation(), IgniteIdGenOperation.class); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingConsumer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingConsumer.java new file mode 100644 index 0000000..6579437 --- /dev/null +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingConsumer.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ignite.messaging; + +import java.util.UUID; + +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.component.ignite.IgniteConstants; +import org.apache.camel.impl.DefaultConsumer; +import org.apache.ignite.IgniteMessaging; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class IgniteMessagingConsumer extends DefaultConsumer { + + private static final Logger LOG = LoggerFactory.getLogger(IgniteMessagingConsumer.class); + + private IgniteMessagingEndpoint endpoint; + private IgniteMessaging messaging; + + private IgniteBiPredicate<UUID, Object> predicate = new IgniteBiPredicate<UUID, Object>() { + private static final long serialVersionUID = -971933058406324501L; + + @Override + public boolean apply(UUID uuid, Object payload) { + Exchange exchange = endpoint.createExchange(ExchangePattern.InOnly); + Message in = exchange.getIn(); + in.setBody(payload); + in.setHeader(IgniteConstants.IGNITE_MESSAGING_TOPIC, endpoint.getTopic()); + in.setHeader(IgniteConstants.IGNITE_MESSAGING_UUID, uuid); + try { + getProcessor().process(exchange); + } catch (Exception e) { + LOG.error(String.format("Exception while processing Ignite Message from topic %s", endpoint.getTopic()), e); + } + return true; + } + }; + + public IgniteMessagingConsumer(IgniteMessagingEndpoint endpoint, Processor processor, IgniteMessaging messaging) { + super(endpoint, processor); + this.endpoint = endpoint; + this.messaging = messaging; + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + + messaging.localListen(endpoint.getTopic(), predicate); + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + + messaging.stopLocalListen(endpoint.getTopic(), predicate); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingEndpoint.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingEndpoint.java new file mode 100644 index 0000000..2277a11 --- /dev/null +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingEndpoint.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ignite.messaging; + +import java.net.URI; +import java.util.Map; + +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.component.ignite.AbstractIgniteEndpoint; +import org.apache.camel.component.ignite.ClusterGroupExpression; +import org.apache.camel.component.ignite.IgniteComponent; +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteMessaging; + +@UriEndpoint(scheme = "ignite:messaging", title = "Ignite Messaging", syntax = "ignite:messaging:[topic]", label = "nosql,cache,messaging") +public class IgniteMessagingEndpoint extends AbstractIgniteEndpoint { + + @UriParam + @Metadata(required = "true") + private String topic; + + @UriParam + private ClusterGroupExpression clusterGroupExpression; + + @UriParam + private IgniteMessagingSendMode sendMode = IgniteMessagingSendMode.UNORDERED; + + @UriParam + private Long timeout; + + public IgniteMessagingEndpoint(String endpointUri, URI remainingUri, Map<String, Object> parameters, IgniteComponent igniteComponent) { + super(endpointUri, igniteComponent); + topic = remainingUri.getHost(); + } + + @Override + public Producer createProducer() throws Exception { + // Validate options. + if (topic == null) { + throw new IllegalStateException("Cannot initialize an Ignite Messaging Producer with a null topic."); + } + + if (sendMode == IgniteMessagingSendMode.ORDERED && timeout == null) { + throw new IllegalStateException("Cannot initialize an Ignite Messaging Producer in ORDERED send mode without a timeout."); + } + + // Initialize the Producer. + IgniteMessaging messaging = createIgniteMessaging(); + return new IgniteMessagingProducer(this, igniteComponent().getIgnite(), messaging); + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + // Validate options. + if (topic == null) { + new IllegalStateException("Cannot initialize an Ignite Messaging Producer with a null topic."); + } + + // Initialize the Consumer. + IgniteMessaging messaging = createIgniteMessaging(); + IgniteMessagingConsumer consumer = new IgniteMessagingConsumer(this, processor, messaging); + configureConsumer(consumer); + return consumer; + } + + private IgniteMessaging createIgniteMessaging() { + Ignite ignite = ignite(); + IgniteMessaging messaging = clusterGroupExpression == null ? ignite.message() : ignite.message(clusterGroupExpression.getClusterGroup(ignite)); + return messaging; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public ClusterGroupExpression getClusterGroupExpression() { + return clusterGroupExpression; + } + + public void setClusterGroupExpression(ClusterGroupExpression clusterGroupExpression) { + this.clusterGroupExpression = clusterGroupExpression; + } + + public Long getTimeout() { + return timeout; + } + + public void setTimeout(Long timeout) { + this.timeout = timeout; + } + + public IgniteMessagingSendMode getSendMode() { + return sendMode; + } + + public void setSendMode(IgniteMessagingSendMode sendMode) { + this.sendMode = sendMode; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingProducer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingProducer.java new file mode 100644 index 0000000..c4946b8 --- /dev/null +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingProducer.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ignite.messaging; + +import java.util.Collection; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.component.ignite.IgniteConstants; +import org.apache.camel.component.ignite.IgniteHelper; +import org.apache.camel.impl.DefaultAsyncProducer; +import org.apache.camel.util.MessageHelper; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteMessaging; + +public class IgniteMessagingProducer extends DefaultAsyncProducer { + + private IgniteMessagingEndpoint endpoint; + private IgniteMessaging messaging; + + public IgniteMessagingProducer(IgniteMessagingEndpoint endpoint, Ignite ignite, IgniteMessaging messaging) { + super(endpoint); + this.endpoint = endpoint; + this.messaging = messaging; + } + + @Override + public boolean process(Exchange exchange, AsyncCallback callback) { + Message in = exchange.getIn(); + Message out = exchange.getOut(); + MessageHelper.copyHeaders(exchange.getIn(), out, true); + + Object body = in.getBody(); + + if (endpoint.getSendMode() == IgniteMessagingSendMode.UNORDERED) { + if (body instanceof Collection<?> && !endpoint.isTreatCollectionsAsCacheObjects()) { + messaging.send(topicFor(exchange), (Collection<?>) body); + } else { + messaging.send(topicFor(exchange), body); + } + } else { + messaging.sendOrdered(topicFor(exchange), body, endpoint.getTimeout()); + } + + IgniteHelper.maybePropagateIncomingBody(endpoint, in, out); + + return true; + } + + private String topicFor(Exchange exchange) { + return exchange.getIn().getHeader(IgniteConstants.IGNITE_MESSAGING_TOPIC, endpoint.getTopic(), String.class); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingSendMode.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingSendMode.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingSendMode.java new file mode 100644 index 0000000..0bf472c --- /dev/null +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingSendMode.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ignite.messaging; + +public enum IgniteMessagingSendMode { + + ORDERED, UNORDERED + +} http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/queue/IgniteQueueEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/queue/IgniteQueueEndpoint.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/queue/IgniteQueueEndpoint.java new file mode 100644 index 0000000..688a209 --- /dev/null +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/queue/IgniteQueueEndpoint.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ignite.queue; + +import java.net.URI; +import java.util.Map; + +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.component.ignite.AbstractIgniteEndpoint; +import org.apache.camel.component.ignite.IgniteComponent; +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.apache.camel.util.EndpointHelper; +import org.apache.camel.util.IntrospectionSupport; +import org.apache.camel.util.ObjectHelper; +import org.apache.ignite.IgniteQueue; +import org.apache.ignite.configuration.CollectionConfiguration; + +/** + * Ignite Queue endpoint. + */ +@UriEndpoint(scheme = "ignite:queue", title = "Ignite Queues", syntax = "ignite:queue:[name]", label = "nosql,cache", producerOnly = true) +public class IgniteQueueEndpoint extends AbstractIgniteEndpoint { + + @UriParam @Metadata(required = "true") + private String name; + + @UriParam + private int capacity; + + @UriParam + private CollectionConfiguration configuration = new CollectionConfiguration(); + + @UriParam + private Long timeoutMillis; + + @UriParam + private IgniteQueueOperation operation; + + public IgniteQueueEndpoint(String endpointUri, URI remainingUri, Map<String, Object> parameters, IgniteComponent igniteComponent) throws Exception { + super(endpointUri, igniteComponent); + name = remainingUri.getHost(); + + ObjectHelper.notNull(name, "Queue name"); + + // Set the configuration values. + if (!parameters.containsKey("configuration")) { + Map<String, Object> configProps = IntrospectionSupport.extractProperties(parameters, "config."); + EndpointHelper.setReferenceProperties(this.getCamelContext(), configProps, parameters); + EndpointHelper.setProperties(this.getCamelContext(), configProps, parameters); + } + } + + @Override + public Producer createProducer() throws Exception { + IgniteQueue<Object> queue = ignite().queue(name, capacity, configuration); + + return new IgniteQueueProducer(this, queue); + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + throw new UnsupportedOperationException("The Ignite Queue endpoint doesn't support consumers."); + } + + /** + * Gets the queue name. + * + * @return + */ + public String getName() { + return name; + } + + /** + * Sets the queue name. + * + * @param name + */ + public void setName(String name) { + this.name = name; + } + + /** + * Gets the queue operation to perform. + * + * @return + */ + public IgniteQueueOperation getOperation() { + return operation; + } + + /** + * Sets the queue operation to perform. + * + * @param operation + */ + public void setOperation(IgniteQueueOperation operation) { + this.operation = operation; + } + + /** + * Gets the queue capacity. Default: non-bounded. + * + * @return + */ + public int getCapacity() { + return capacity; + } + + /** + * Sets the queue capacity. Default: non-bounded. + * + * @param capacity + */ + public void setCapacity(int capacity) { + this.capacity = capacity; + } + + /** + * Gets the collection configuration. Default: empty configuration. + * + * @return + */ + public CollectionConfiguration getConfiguration() { + return configuration; + } + + /** + * Sets the collection configuration. Default: empty configuration. + * <p> + * You can also conveniently set inner properties by using <tt>configuration.xyz=123</tt> options. + * + * @param configuration + */ + public void setConfiguration(CollectionConfiguration configuration) { + this.configuration = configuration; + } + + /** + * Gets the queue timeout in milliseconds. Default: no timeout. + * + * @return + */ + public Long getTimeoutMillis() { + return timeoutMillis; + } + + /** + * Sets the queue timeout in milliseconds. Default: no timeout. + * + * @param timeoutMillis + */ + public void setTimeoutMillis(Long timeoutMillis) { + this.timeoutMillis = timeoutMillis; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/queue/IgniteQueueOperation.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/queue/IgniteQueueOperation.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/queue/IgniteQueueOperation.java new file mode 100644 index 0000000..a342bcd --- /dev/null +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/queue/IgniteQueueOperation.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ignite.queue; + +/** + * Enumeration of queue operations. + */ +public enum IgniteQueueOperation { + + CONTAINS, ADD, SIZE, REMOVE, ITERATOR, CLEAR, RETAIN_ALL, ARRAY, DRAIN, ELEMENT, PEEK, OFFER, POLL, TAKE, PUT + +} http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/queue/IgniteQueueProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/queue/IgniteQueueProducer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/queue/IgniteQueueProducer.java new file mode 100644 index 0000000..191e62f --- /dev/null +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/queue/IgniteQueueProducer.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ignite.queue; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.component.ignite.IgniteConstants; +import org.apache.camel.impl.DefaultAsyncProducer; +import org.apache.camel.util.MessageHelper; +import org.apache.ignite.IgniteQueue; + +public class IgniteQueueProducer extends DefaultAsyncProducer { + + private IgniteQueueEndpoint endpoint; + private IgniteQueue<Object> queue; + + public IgniteQueueProducer(IgniteQueueEndpoint endpoint, IgniteQueue<Object> queue) { + super(endpoint); + this.endpoint = endpoint; + this.queue = queue; + } + + @Override + @SuppressWarnings("unchecked") + public boolean process(Exchange exchange, AsyncCallback callback) { + Message in = exchange.getIn(); + Message out = exchange.getOut(); + MessageHelper.copyHeaders(exchange.getIn(), out, true); + + Object body = in.getBody(); + Long millis; + + switch (queueOperationFor(exchange)) { + + case ADD: + if (Collection.class.isAssignableFrom(body.getClass()) && !endpoint.isTreatCollectionsAsCacheObjects()) { + out.setBody(queue.addAll((Collection<? extends Object>) body)); + } else { + out.setBody(queue.add(body)); + } + break; + + case CONTAINS: + if (Collection.class.isAssignableFrom(body.getClass()) && !endpoint.isTreatCollectionsAsCacheObjects()) { + out.setBody(queue.containsAll((Collection<? extends Object>) body)); + } else { + out.setBody(queue.contains(body)); + } + break; + + case SIZE: + out.setBody(queue.size()); + break; + + case REMOVE: + if (Collection.class.isAssignableFrom(body.getClass()) && !endpoint.isTreatCollectionsAsCacheObjects()) { + out.setBody(queue.removeAll((Collection<? extends Object>) body)); + } else { + out.setBody(queue.remove(body)); + } + break; + + case CLEAR: + if (endpoint.isPropagateIncomingBodyIfNoReturnValue()) { + out.setBody(body); + } + queue.clear(); + break; + + case ITERATOR: + Iterator<?> iterator = queue.iterator(); + out.setBody(iterator); + break; + + case ARRAY: + out.setBody(queue.toArray()); + break; + + case RETAIN_ALL: + if (Collection.class.isAssignableFrom(body.getClass())) { + out.setBody(queue.retainAll((Collection<? extends Object>) body)); + } else { + out.setBody(queue.retainAll(Collections.singleton(body))); + } + break; + + case DRAIN: + Integer maxElements = in.getHeader(IgniteConstants.IGNITE_QUEUE_MAX_ELEMENTS, Integer.class); + + Collection<Object> col = null; + if (body != null && Collection.class.isAssignableFrom(body.getClass())) { + col = (Collection<Object>) body; + } else { + col = maxElements != null ? new ArrayList<>(maxElements) : new ArrayList<>(); + } + + int transferred = -1; + if (maxElements == null) { + transferred = queue.drainTo(col); + } else { + transferred = queue.drainTo(col, maxElements); + } + out.setBody(col); + out.setHeader(IgniteConstants.IGNITE_QUEUE_TRANSFERRED_COUNT, transferred); + break; + + case ELEMENT: + out.setBody(queue.element()); + break; + + case OFFER: + millis = in.getHeader(IgniteConstants.IGNITE_QUEUE_TIMEOUT_MILLIS, endpoint.getTimeoutMillis(), Long.class); + boolean result = millis == null ? queue.offer(body) : queue.offer(body, millis, TimeUnit.MILLISECONDS); + out.setBody(result); + break; + + case PEEK: + out.setBody(queue.peek()); + break; + + case POLL: + millis = in.getHeader(IgniteConstants.IGNITE_QUEUE_TIMEOUT_MILLIS, endpoint.getTimeoutMillis(), Long.class); + out.setBody(millis == null ? queue.poll() : queue.poll(millis, TimeUnit.MILLISECONDS)); + break; + + case PUT: + if (endpoint.isPropagateIncomingBodyIfNoReturnValue()) { + out.setBody(in.getBody()); + } + queue.put(body); + break; + + case TAKE: + out.setBody(queue.take()); + break; + + default: + exchange.setException(new UnsupportedOperationException("Operation not supported by Ignite Queue producer.")); + return true; + } + + return true; + } + + private IgniteQueueOperation queueOperationFor(Exchange exchange) { + return exchange.getIn().getHeader(IgniteConstants.IGNITE_QUEUE_OPERATION, endpoint.getOperation(), IgniteQueueOperation.class); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/set/IgniteSetEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/set/IgniteSetEndpoint.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/set/IgniteSetEndpoint.java new file mode 100644 index 0000000..6125ff9 --- /dev/null +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/set/IgniteSetEndpoint.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ignite.set; + +import java.net.URI; +import java.util.Map; + +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.component.ignite.AbstractIgniteEndpoint; +import org.apache.camel.component.ignite.IgniteComponent; +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.apache.camel.util.EndpointHelper; +import org.apache.camel.util.IntrospectionSupport; +import org.apache.camel.util.ObjectHelper; +import org.apache.ignite.configuration.CollectionConfiguration; + +/** + * Ignite Cache endpoint. + */ +@UriEndpoint(scheme = "ignite:set", title = "Ignite Sets", syntax = "ignite:set:[name]", label = "nosql,cache", producerOnly = true) +public class IgniteSetEndpoint extends AbstractIgniteEndpoint { + + @UriParam @Metadata(required = "true") + private String name; + + @UriParam + private CollectionConfiguration configuration = new CollectionConfiguration(); + + @UriParam + private IgniteSetOperation operation; + + public IgniteSetEndpoint(String endpointUri, URI remainingUri, Map<String, Object> parameters, IgniteComponent igniteComponent) throws Exception { + super(endpointUri, igniteComponent); + name = remainingUri.getHost(); + + ObjectHelper.notNull(name, "Set name"); + + // Set the configuration values. + if (!parameters.containsKey("configuration")) { + Map<String, Object> configProps = IntrospectionSupport.extractProperties(parameters, "config."); + EndpointHelper.setReferenceProperties(this.getCamelContext(), configProps, parameters); + EndpointHelper.setProperties(this.getCamelContext(), configProps, parameters); + } + + } + + @Override + public Producer createProducer() throws Exception { + return new IgniteSetProducer(this, ignite().set(name, configuration)); + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + throw new UnsupportedOperationException("The Ignite Sets endpoint doesn't support consumers."); + } + + /** + * Gets the set name. + * + * @return + */ + public String getName() { + return name; + } + + /** + * Sets the set name. + * + * @param name + */ + public void setName(String name) { + this.name = name; + } + + /** + * Gets the collection configuration. Default: empty configuration. + * + * @return + */ + public CollectionConfiguration getConfiguration() { + return configuration; + } + + /** + * Sets the collection configuration. Default: empty configuration. + * <p> + * You can also conveniently set inner properties by using <tt>configuration.xyz=123</tt> options. + * + * @param configuration + */ + public void setConfiguration(CollectionConfiguration configuration) { + this.configuration = configuration; + } + + /** + * Gets the set operation to perform. + * + * @return + */ + public IgniteSetOperation getOperation() { + return operation; + } + + /** + * Sets the set operation to perform. + * + * @param operation + */ + public void setOperation(IgniteSetOperation operation) { + this.operation = operation; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/set/IgniteSetOperation.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/set/IgniteSetOperation.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/set/IgniteSetOperation.java new file mode 100644 index 0000000..ad054e9 --- /dev/null +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/set/IgniteSetOperation.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ignite.set; + +/** + * Enumeration of Set operations. + */ +public enum IgniteSetOperation { + + CONTAINS, ADD, SIZE, REMOVE, ITERATOR, CLEAR, RETAIN_ALL, ARRAY + +} http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/set/IgniteSetProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/set/IgniteSetProducer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/set/IgniteSetProducer.java new file mode 100644 index 0000000..918a0aa --- /dev/null +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/set/IgniteSetProducer.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ignite.set; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.component.ignite.IgniteConstants; +import org.apache.camel.impl.DefaultAsyncProducer; +import org.apache.camel.util.MessageHelper; +import org.apache.ignite.IgniteSet; + +/** + * The Ignite Set producer. + */ +public class IgniteSetProducer extends DefaultAsyncProducer { + + private IgniteSetEndpoint endpoint; + private IgniteSet<Object> set; + + public IgniteSetProducer(IgniteSetEndpoint endpoint, IgniteSet<Object> set) { + super(endpoint); + this.endpoint = endpoint; + this.set = set; + } + + @Override + @SuppressWarnings("unchecked") + public boolean process(Exchange exchange, AsyncCallback callback) { + Message in = exchange.getIn(); + Message out = exchange.getOut(); + MessageHelper.copyHeaders(exchange.getIn(), out, true); + + Object body = in.getBody(); + + switch (setOperationFor(exchange)) { + + case ADD: + if (Collection.class.isAssignableFrom(body.getClass()) && !endpoint.isTreatCollectionsAsCacheObjects()) { + out.setBody(set.addAll((Collection<? extends Object>) body)); + } else { + out.setBody(set.add(body)); + } + break; + + case CONTAINS: + if (Collection.class.isAssignableFrom(body.getClass()) && !endpoint.isTreatCollectionsAsCacheObjects()) { + out.setBody(set.containsAll((Collection<? extends Object>) body)); + } else { + out.setBody(set.contains(body)); + } + break; + + case SIZE: + out.setBody(set.size()); + break; + + case REMOVE: + if (Collection.class.isAssignableFrom(body.getClass()) && !endpoint.isTreatCollectionsAsCacheObjects()) { + out.setBody(set.removeAll((Collection<? extends Object>) body)); + } else { + out.setBody(set.remove(body)); + } + break; + + case CLEAR: + if (endpoint.isPropagateIncomingBodyIfNoReturnValue()) { + out.setBody(body); + } + set.clear(); + break; + + case ITERATOR: + Iterator<?> iterator = set.iterator(); + out.setBody(iterator); + break; + + case ARRAY: + out.setBody(set.toArray()); + break; + + case RETAIN_ALL: + if (Collection.class.isAssignableFrom(body.getClass())) { + out.setBody(set.retainAll((Collection<? extends Object>) body)); + } else { + out.setBody(set.retainAll(Collections.singleton(body))); + } + break; + + default: + exchange.setException(new UnsupportedOperationException("Operation not supported by Ignite Set producer.")); + return true; + } + + return true; + } + + private IgniteSetOperation setOperationFor(Exchange exchange) { + return exchange.getIn().getHeader(IgniteConstants.IGNITE_SETS_OPERATION, endpoint.getOperation(), IgniteSetOperation.class); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/resources/META-INF/LICENSE.txt ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/resources/META-INF/LICENSE.txt b/components/camel-ignite/src/main/resources/META-INF/LICENSE.txt new file mode 100644 index 0000000..6b0b127 --- /dev/null +++ b/components/camel-ignite/src/main/resources/META-INF/LICENSE.txt @@ -0,0 +1,203 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/resources/META-INF/NOTICE.txt ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/resources/META-INF/NOTICE.txt b/components/camel-ignite/src/main/resources/META-INF/NOTICE.txt new file mode 100644 index 0000000..2e215bf --- /dev/null +++ b/components/camel-ignite/src/main/resources/META-INF/NOTICE.txt @@ -0,0 +1,11 @@ + ========================================================================= + == NOTICE file corresponding to the section 4 d of == + == the Apache License, Version 2.0, == + == in this case for the Apache Camel distribution. == + ========================================================================= + + This product includes software developed by + The Apache Software Foundation (http://www.apache.org/). + + Please read the different LICENSE files present in the licenses directory of + this distribution. http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/resources/META-INF/services/org/apache/camel/component/ignite ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/resources/META-INF/services/org/apache/camel/component/ignite b/components/camel-ignite/src/main/resources/META-INF/services/org/apache/camel/component/ignite new file mode 100644 index 0000000..fca42c6 --- /dev/null +++ b/components/camel-ignite/src/main/resources/META-INF/services/org/apache/camel/component/ignite @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class=org.apache.camel.component.ignite.IgniteComponent http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/AbstractIgniteTest.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/AbstractIgniteTest.java b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/AbstractIgniteTest.java new file mode 100644 index 0000000..d46cd1b --- /dev/null +++ b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/AbstractIgniteTest.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ignite; + +import java.util.Collections; +import java.util.UUID; + +import org.apache.camel.CamelContext; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.apache.ignite.Ignite; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.EventType; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; + +public class AbstractIgniteTest extends CamelTestSupport { + + /** Ip finder for TCP discovery. */ + private static final TcpDiscoveryIpFinder LOCAL_IP_FINDER = new TcpDiscoveryVmIpFinder(false) { { + setAddresses(Collections.singleton("127.0.0.1:47500..47509")); + } }; + + private Ignite ignite; + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + context.addComponent("ignite", buildComponent()); + return context; + } + + protected IgniteComponent buildComponent() { + IgniteConfiguration config = new IgniteConfiguration(); + config.setGridName(UUID.randomUUID().toString()); + config.setIncludeEventTypes(EventType.EVT_JOB_FINISHED, EventType.EVT_JOB_RESULTED); + config.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(LOCAL_IP_FINDER)); + + return IgniteComponent.fromConfiguration(config); + } + + protected Ignite ignite() { + if (ignite == null) { + ignite = context.getComponent("ignite", IgniteComponent.class).getIgnite(); + } + return ignite; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteCacheContinuousQueryTest.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteCacheContinuousQueryTest.java b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteCacheContinuousQueryTest.java new file mode 100644 index 0000000..c44b7f0 --- /dev/null +++ b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteCacheContinuousQueryTest.java @@ -0,0 +1,281 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ignite; + +import java.io.Serializable; +import java.util.Map; +import java.util.Objects; + +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.EventType; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterators; +import com.google.common.collect.Maps; + +import org.apache.camel.Exchange; +import org.apache.camel.Route; +import org.apache.camel.ServiceStatus; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.JndiRegistry; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheEntryEventSerializableFilter; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.junit.After; +import org.junit.Test; + +import static com.google.common.truth.Truth.assert_; + +public class IgniteCacheContinuousQueryTest extends AbstractIgniteTest implements Serializable { + + private static final long serialVersionUID = 1L; + + @Test + public void testContinuousQueryDoNotFireExistingEntries() throws Exception { + context.startRoute("continuousQuery"); + + getMockEndpoint("mock:test1").expectedMessageCount(100); + + Map<Integer, Person> persons = createPersons(1, 100); + IgniteCache<Integer, Person> cache = ignite().getOrCreateCache("testcontinuous1"); + cache.putAll(persons); + + assertMockEndpointsSatisfied(); + + for (Exchange exchange : getMockEndpoint("mock:test1").getExchanges()) { + assert_().that(exchange.getIn().getHeader(IgniteConstants.IGNITE_CACHE_NAME)).isEqualTo("testcontinuous1"); + assert_().that(exchange.getIn().getHeader(IgniteConstants.IGNITE_CACHE_EVENT_TYPE)).isEqualTo(EventType.CREATED); + assert_().that(exchange.getIn().getHeader(IgniteConstants.IGNITE_CACHE_KEY)).isIn(persons.keySet()); + assert_().that(exchange.getIn().getBody()).isIn(persons.values()); + } + } + + @Test + public void testContinuousQueryFireExistingEntriesWithQuery() throws Exception { + getMockEndpoint("mock:test2").expectedMessageCount(50); + + Map<Integer, Person> persons = createPersons(1, 100); + IgniteCache<Integer, Person> cache = ignite().getOrCreateCache("testcontinuous1"); + cache.putAll(persons); + + context.startRoute("continuousQuery.fireExistingEntries"); + + assertMockEndpointsSatisfied(); + + resetMocks(); + + getMockEndpoint("mock:test2").expectedMessageCount(100); + + persons = createPersons(101, 100); + cache.putAll(persons); + + assertMockEndpointsSatisfied(); + } + + @Test + public void testContinuousQueryFireExistingEntriesWithQueryAndRemoteFilter() throws Exception { + getMockEndpoint("mock:test3").expectedMessageCount(50); + + Map<Integer, Person> persons = createPersons(1, 100); + IgniteCache<Integer, Person> cache = ignite().getOrCreateCache("testcontinuous1"); + cache.putAll(persons); + + context.startRoute("remoteFilter"); + + assertMockEndpointsSatisfied(); + + resetMocks(); + + getMockEndpoint("mock:test3").expectedMessageCount(50); + + persons = createPersons(101, 100); + cache.putAll(persons); + + assertMockEndpointsSatisfied(); + } + + @Test + public void testContinuousQueryGroupedUpdates() throws Exception { + // One hundred Iterables of 1 item each. + getMockEndpoint("mock:test4").expectedMessageCount(100); + + context.startRoute("groupedUpdate"); + + Map<Integer, Person> persons = createPersons(1, 100); + IgniteCache<Integer, Person> cache = ignite().getOrCreateCache("testcontinuous1"); + cache.putAll(persons); + + assertMockEndpointsSatisfied(); + + for (Exchange exchange : getMockEndpoint("mock:test4").getExchanges()) { + assert_().that(exchange.getIn().getHeader(IgniteConstants.IGNITE_CACHE_NAME)).isEqualTo("testcontinuous1"); + assert_().that(exchange.getIn().getBody()).isInstanceOf(Iterable.class); + assert_().that(Iterators.size(exchange.getIn().getBody(Iterable.class).iterator())).isEqualTo(1); + } + + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("ignite:cache:testcontinuous1?query=#query1").routeId("continuousQuery").noAutoStartup().to("mock:test1"); + + from("ignite:cache:testcontinuous1?query=#query1&fireExistingQueryResults=true").routeId("continuousQuery.fireExistingEntries").noAutoStartup().to("mock:test2"); + + from("ignite:cache:testcontinuous1?query=#query1&remoteFilter=#remoteFilter1&fireExistingQueryResults=true").routeId("remoteFilter").noAutoStartup().to("mock:test3"); + + from("ignite:cache:testcontinuous1?pageSize=10&oneExchangePerUpdate=false").routeId("groupedUpdate").noAutoStartup().to("mock:test4"); + + } + }; + } + + private Map<Integer, Person> createPersons(int from, int count) { + Map<Integer, Person> answer = Maps.newHashMap(); + int max = from + count; + for (int i = from; i < max; i++) { + answer.put(i, Person.create(i, "name" + i, "surname" + i)); + } + return answer; + } + + @Override + public boolean isCreateCamelContextPerClass() { + return true; + } + + @After + public void deleteCaches() { + for (String cacheName : ImmutableSet.<String> of("testcontinuous1", "testcontinuous2", "testcontinuous3")) { + IgniteCache<?, ?> cache = ignite().cache(cacheName); + if (cache == null) { + continue; + } + cache.clear(); + } + } + + @After + public void stopAllRoutes() throws Exception { + for (Route route : context.getRoutes()) { + if (context.getRouteStatus(route.getId()) != ServiceStatus.Started) { + return; + } + context.stopRoute(route.getId()); + } + + resetMocks(); + } + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry answer = super.createRegistry(); + + ScanQuery<Integer, Person> scanQuery1 = new ScanQuery<>(new IgniteBiPredicate<Integer, Person>() { + private static final long serialVersionUID = 1L; + + @Override + public boolean apply(Integer key, Person person) { + return person.getId() > 50; + } + }); + + CacheEntryEventSerializableFilter<Integer, Person> remoteFilter = new CacheEntryEventSerializableFilter<Integer, IgniteCacheContinuousQueryTest.Person>() { + private static final long serialVersionUID = 5624973479995548199L; + + @Override + public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Person> event) throws CacheEntryListenerException { + return event.getValue().getId() > 150; + } + }; + + answer.bind("query1", scanQuery1); + answer.bind("remoteFilter1", remoteFilter); + + return answer; + } + + public static class Person implements Serializable { + private static final long serialVersionUID = -6582521698437964648L; + + private Integer id; + private String name; + private String surname; + + public static Person create(Integer id, String name, String surname) { + Person p = new Person(); + p.setId(id); + p.setName(name); + p.setSurname(surname); + return p; + } + + public Integer getId() { + return id; + } + + public void setId(Integer id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getSurname() { + return surname; + } + + public void setSurname(String surname) { + this.surname = surname; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((id == null) ? 0 : id.hashCode()); + result = prime * result + ((name == null) ? 0 : name.hashCode()); + result = prime * result + ((surname == null) ? 0 : surname.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (obj == null || !(obj instanceof Person)) { + return false; + } + + if (this == obj) { + return true; + } + + Person other = (Person) obj; + return Objects.equals(this.id, other.id) && Objects.equals(this.name, other.name) && Objects.equals(this.surname, other.surname); + } + + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteCacheTest.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteCacheTest.java b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteCacheTest.java new file mode 100644 index 0000000..b936e1b --- /dev/null +++ b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteCacheTest.java @@ -0,0 +1,237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ignite; + + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +import javax.cache.Cache.Entry; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; + +import org.apache.camel.CamelException; +import org.apache.camel.component.ignite.cache.IgniteCacheOperation; +import org.apache.camel.util.ObjectHelper; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CachePeekMode; +import org.apache.ignite.cache.query.Query; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.junit.After; +import org.junit.Test; + +import static com.google.common.truth.Truth.assert_; + +public class IgniteCacheTest extends AbstractIgniteTest { + + @Test + public void testAddEntry() { + template.requestBodyAndHeader("ignite:cache:testcache1?operation=PUT", "1234", IgniteConstants.IGNITE_CACHE_KEY, "abcd"); + + assert_().that(ignite().cache("testcache1").size(CachePeekMode.ALL)).isEqualTo(1); + assert_().that(ignite().cache("testcache1").get("abcd")).isEqualTo("1234"); + } + + @Test + public void testAddEntrySet() { + template.requestBody("ignite:cache:testcache1?operation=PUT", ImmutableMap.of("abcd", "1234", "efgh", "5678")); + + assert_().that(ignite().cache("testcache1").size(CachePeekMode.ALL)).isEqualTo(2); + assert_().that(ignite().cache("testcache1").get("abcd")).isEqualTo("1234"); + assert_().that(ignite().cache("testcache1").get("efgh")).isEqualTo("5678"); + } + + @Test + public void testGetOne() { + testAddEntry(); + + String result = template.requestBody("ignite:cache:testcache1?operation=GET", "abcd", String.class); + assert_().that(result).isEqualTo("1234"); + + result = template.requestBodyAndHeader("ignite:cache:testcache1?operation=GET", "this value won't be used", IgniteConstants.IGNITE_CACHE_KEY, "abcd", String.class); + assert_().that(result).isEqualTo("1234"); + } + + @Test + @SuppressWarnings("unchecked") + public void testGetMany() { + IgniteCache<String, String> cache = ignite().getOrCreateCache("testcache1"); + Set<String> keys = new HashSet<>(); + + for (int i = 0; i < 100; i++) { + cache.put("k" + i, "v" + i); + keys.add("k" + i); + } + + Map<String, String> result = template.requestBody("ignite:cache:testcache1?operation=GET", keys, Map.class); + for (String k : keys) { + assert_().that(result.get(k)).isEqualTo(k.replace("k", "v")); + } + } + + @Test + public void testGetSize() { + IgniteCache<String, String> cache = ignite().getOrCreateCache("testcache1"); + Set<String> keys = new HashSet<>(); + + for (int i = 0; i < 100; i++) { + cache.put("k" + i, "v" + i); + keys.add("k" + i); + } + + Integer result = template.requestBody("ignite:cache:testcache1?operation=SIZE", keys, Integer.class); + assert_().that(result).isEqualTo(100); + } + + @Test + @SuppressWarnings("unchecked") + public void testQuery() { + IgniteCache<String, String> cache = ignite().getOrCreateCache("testcache1"); + Set<String> keys = new HashSet<>(); + + for (int i = 0; i < 100; i++) { + cache.put("k" + i, "v" + i); + keys.add("k" + i); + } + + Query<Entry<String, String>> query = new ScanQuery<String, String>(new IgniteBiPredicate<String, String>() { + private static final long serialVersionUID = 1L; + + @Override + public boolean apply(String key, String value) { + return Integer.parseInt(key.replace("k", "")) >= 50; + } + }); + + Iterator<String> iter = template.requestBodyAndHeader("ignite:cache:testcache1?operation=QUERY", keys, IgniteConstants.IGNITE_CACHE_QUERY, query, Iterator.class); + ArrayList<Object> results = Lists.newArrayList(Iterators.toArray(iter, Object.class)); + assert_().that(results.size()).isEqualTo(50); + } + + @Test + public void testGetManyTreatCollectionsAsCacheObjects() { + IgniteCache<Object, String> cache = ignite().getOrCreateCache("testcache1"); + Set<String> keys = new HashSet<>(); + + for (int i = 0; i < 100; i++) { + cache.put("k" + i, "v" + i); + keys.add("k" + i); + } + + // Also add a cache entry with the entire Set as a key. + cache.put(keys, "---"); + + String result = template.requestBody("ignite:cache:testcache1?operation=GET&treatCollectionsAsCacheObjects=true", keys, String.class); + assert_().that(result).isEqualTo("---"); + } + + @Test + public void testRemoveEntry() { + IgniteCache<String, String> cache = ignite().getOrCreateCache("testcache1"); + + cache.put("abcd", "1234"); + cache.put("efgh", "5678"); + + assert_().that(cache.size(CachePeekMode.ALL)).isEqualTo(2); + + template.requestBody("ignite:cache:testcache1?operation=REMOVE", "abcd"); + + assert_().that(cache.size(CachePeekMode.ALL)).isEqualTo(1); + assert_().that(cache.get("abcd")).isNull(); + + template.requestBodyAndHeader("ignite:cache:testcache1?operation=REMOVE", "this value won't be used", IgniteConstants.IGNITE_CACHE_KEY, "efgh"); + + assert_().that(cache.size(CachePeekMode.ALL)).isEqualTo(0); + assert_().that(cache.get("efgh")).isNull(); + + } + + @Test + public void testClearCache() { + IgniteCache<String, String> cache = ignite().getOrCreateCache("testcache1"); + for (int i = 0; i < 100; i++) { + cache.put("k" + i, "v" + i); + } + + assert_().that(cache.size(CachePeekMode.ALL)).isEqualTo(100); + + template.requestBody("ignite:cache:testcache1?operation=CLEAR", "this value won't be used"); + + assert_().that(cache.size(CachePeekMode.ALL)).isEqualTo(0); + } + + @Test + public void testHeaderSetRemoveEntry() { + testAddEntry(); + + String result = template.requestBody("ignite:cache:testcache1?operation=GET", "abcd", String.class); + assert_().that(result).isEqualTo("1234"); + + result = template.requestBodyAndHeader("ignite:cache:testcache1?operation=GET", "abcd", IgniteConstants.IGNITE_CACHE_OPERATION, IgniteCacheOperation.REMOVE, String.class); + + // The body has not changed, but the cache entry is gone. + assert_().that(result).isEqualTo("abcd"); + assert_().that(ignite().cache("testcache1").size(CachePeekMode.ALL)).isEqualTo(0); + } + + @Test + public void testAddEntryNoCacheCreation() { + try { + template.requestBodyAndHeader("ignite:cache:testcache2?operation=PUT&failIfInexistentCache=true", "1234", IgniteConstants.IGNITE_CACHE_KEY, "abcd"); + } catch (Exception e) { + assert_().that(ObjectHelper.getException(CamelException.class, e).getMessage()).startsWith("Ignite cache testcache2 doesn't exist"); + return; + } + + fail("Should have thrown an exception"); + } + + @Test + public void testAddEntryDoNotPropagateIncomingBody() { + Object result = template.requestBodyAndHeader("ignite:cache:testcache1?operation=PUT&propagateIncomingBodyIfNoReturnValue=false", "1234", IgniteConstants.IGNITE_CACHE_KEY, "abcd", + Object.class); + + assert_().that(ignite().cache("testcache1").size(CachePeekMode.ALL)).isEqualTo(1); + assert_().that(ignite().cache("testcache1").get("abcd")).isEqualTo("1234"); + + assert_().that(result).isNull(); + } + + @Override + public boolean isCreateCamelContextPerClass() { + return true; + } + + @After + public void deleteCaches() { + for (String cacheName : ImmutableSet.<String> of("testcache1", "testcache2")) { + IgniteCache<?, ?> cache = ignite().cache(cacheName); + if (cache == null) { + continue; + } + cache.clear(); + } + } + +}