Repository: camel
Updated Branches:
  refs/heads/master d20d0be4c -> 0350423d9


http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteComputeTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteComputeTest.java
 
b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteComputeTest.java
new file mode 100644
index 0000000..fd66450
--- /dev/null
+++ 
b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteComputeTest.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.ignite;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.events.EventType;
+import org.junit.After;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assert_;
+
+public class IgniteComputeTest extends AbstractIgniteTest {
+
+    private static final List<Ignite> ADDITIONAL_INSTANCES = 
Lists.newArrayList();
+    private static final List<UUID> LISTENERS = Lists.newArrayList();
+
+    @Test
+    public void testExecuteWithWrongPayload() {
+        try {
+            template.requestBody("ignite:compute:abc?executionType=EXECUTE", 
TestIgniteComputeResources.TEST_CALLABLE, String.class);
+        } catch (Exception e) {
+            
assert_().that(ObjectHelper.getException(RuntimeCamelException.class, 
e).getMessage()).startsWith("Ignite Compute endpoint with EXECUTE");
+            return;
+        }
+
+        fail();
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testCall() {
+        TestIgniteComputeResources.COUNTER.set(0);
+
+        // Single Callable.
+        String result = 
template.requestBody("ignite:compute:abc?executionType=CALL", 
TestIgniteComputeResources.TEST_CALLABLE, String.class);
+
+        assert_().that(result).isEqualTo("hello");
+
+        // Collection of Callables.
+        Object[] callables = new Object[5];
+        Arrays.fill(callables, TestIgniteComputeResources.TEST_CALLABLE);
+        Collection<String> colResult = 
template.requestBody("ignite:compute:abc?executionType=CALL", 
Lists.newArrayList(callables), Collection.class);
+
+        assert_().that(colResult).containsExactly("hello", "hello", "hello", 
"hello", "hello").inOrder();
+
+        // Callables with a Reducer.
+        String reduced = 
template.requestBodyAndHeader("ignite:compute:abc?executionType=CALL", 
Lists.newArrayList(callables), IgniteConstants.IGNITE_COMPUTE_REDUCER,
+                TestIgniteComputeResources.STRING_JOIN_REDUCER, String.class);
+
+        assert_().that(reduced).isEqualTo("hellohellohellohellohello");
+    }
+
+    @Test
+    public void testRun() {
+        TestIgniteComputeResources.COUNTER.set(0);
+
+        // Single Runnable.
+        Object result = 
template.requestBody("ignite:compute:abc?executionType=RUN", 
TestIgniteComputeResources.TEST_RUNNABLE_COUNTER, Object.class);
+        assert_().that(result).isNull();
+        assert_().that(TestIgniteComputeResources.COUNTER.get()).isEqualTo(1);
+
+        // Multiple Runnables.
+        Object[] runnables = new Object[5];
+        Arrays.fill(runnables, 
TestIgniteComputeResources.TEST_RUNNABLE_COUNTER);
+        result = template.requestBody("ignite:compute:abc?executionType=RUN", 
Lists.newArrayList(runnables), Collection.class);
+        assert_().that(result).isNull();
+        assert_().that(TestIgniteComputeResources.COUNTER.get()).isEqualTo(6);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testBroadcast() {
+        TestIgniteComputeResources.COUNTER.set(0);
+
+        startAdditionalGridInstance();
+        startAdditionalGridInstance();
+
+        ignite().events().enableLocal(EventType.EVT_JOB_FINISHED);
+        LISTENERS.add(ignite().events().remoteListen(null, 
TestIgniteComputeResources.EVENT_COUNTER, EventType.EVT_JOB_FINISHED));
+
+        // Single Runnable.
+        Object result = 
template.requestBody("ignite:compute:abc?executionType=BROADCAST", 
TestIgniteComputeResources.TEST_RUNNABLE, Object.class);
+        assert_().that(result).isNull();
+        assert_().that(TestIgniteComputeResources.COUNTER.get()).isEqualTo(3);
+
+        // Single Callable.
+        Collection<String> colResult = 
template.requestBody("ignite:compute:abc?executionType=BROADCAST", 
TestIgniteComputeResources.TEST_CALLABLE, Collection.class);
+        assert_().that(colResult).isNotNull();
+        assert_().that(colResult).containsExactly("hello", "hello", 
"hello").inOrder();
+
+        // Single Closure.
+        colResult = 
template.requestBodyAndHeader("ignite:compute:abc?executionType=BROADCAST", 
TestIgniteComputeResources.TEST_CLOSURE, IgniteConstants.IGNITE_COMPUTE_PARAMS, 
"Camel",
+                Collection.class);
+        assert_().that(colResult).isNotNull();
+        assert_().that(colResult).containsExactly("hello Camel", "hello 
Camel", "hello Camel").inOrder();
+    }
+
+    @Test
+    public void testExecute() {
+        TestIgniteComputeResources.COUNTER.set(0);
+
+        startAdditionalGridInstance();
+        startAdditionalGridInstance();
+
+        ignite().events().enableLocal(EventType.EVT_JOB_RESULTED);
+        LISTENERS.add(ignite().events().remoteListen(null, 
TestIgniteComputeResources.EVENT_COUNTER, EventType.EVT_JOB_RESULTED));
+
+        // ComputeTask instance.
+        String result = 
template.requestBodyAndHeader("ignite:compute:abc?executionType=EXECUTE", 
TestIgniteComputeResources.COMPUTE_TASK, IgniteConstants.IGNITE_COMPUTE_PARAMS, 
10, String.class);
+        assert_().that(result).isNotNull();
+        
assert_().that(Splitter.on(",").splitToList(result)).containsAllOf("a0", "a1", 
"a2", "a3", "a4", "a5", "a6", "a7", "a8", "a9");
+
+        // ComputeTask class.
+        result = 
template.requestBodyAndHeader("ignite:compute:abc?executionType=EXECUTE", 
TestIgniteComputeResources.COMPUTE_TASK.getClass(), 
IgniteConstants.IGNITE_COMPUTE_PARAMS, 10, String.class);
+        assert_().that(result).isNotNull();
+        
assert_().that(Splitter.on(",").splitToList(result)).containsAllOf("a0", "a1", 
"a2", "a3", "a4", "a5", "a6", "a7", "a8", "a9");
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testApply() {
+        TestIgniteComputeResources.COUNTER.set(0);
+
+        // Closure with a single parameter.
+        String result = 
template.requestBodyAndHeader("ignite:compute:abc?executionType=APPLY", 
TestIgniteComputeResources.TEST_CLOSURE, IgniteConstants.IGNITE_COMPUTE_PARAMS, 
"Camel", String.class);
+        assert_().that(result).isEqualTo("hello Camel");
+
+        // Closure with a Collection of parameters.
+        Collection<String> colResult = 
template.requestBodyAndHeader("ignite:compute:abc?executionType=APPLY", 
TestIgniteComputeResources.TEST_CLOSURE, IgniteConstants.IGNITE_COMPUTE_PARAMS,
+                Lists.newArrayList("Camel1", "Camel2", "Camel3"), 
Collection.class);
+        assert_().that(colResult).containsAllOf("hello Camel1", "hello 
Camel2", "hello Camel3");
+
+        // Closure with a Collection of parameters and a Reducer.
+        Map<String, Object> headers = ImmutableMap.<String, Object> 
of(IgniteConstants.IGNITE_COMPUTE_PARAMS, Lists.newArrayList("Camel1", 
"Camel2", "Camel3"), IgniteConstants.IGNITE_COMPUTE_REDUCER,
+                TestIgniteComputeResources.STRING_JOIN_REDUCER);
+        result = 
template.requestBodyAndHeaders("ignite:compute:abc?executionType=APPLY", 
TestIgniteComputeResources.TEST_CLOSURE, headers, String.class);
+        assert_().that(result).isEqualTo("hello Camel1hello Camel2hello 
Camel3");
+    }
+
+    @Override
+    public boolean isCreateCamelContextPerClass() {
+        return true;
+    }
+
+    private void startAdditionalGridInstance() {
+        
ADDITIONAL_INSTANCES.add(Ignition.start(buildComponent().getIgniteConfiguration()));
+    }
+
+    @After
+    public void stopAdditionalIgniteInstances() {
+        for (Ignite ignite : ADDITIONAL_INSTANCES) {
+            ignite.close();
+        }
+        ADDITIONAL_INSTANCES.clear();
+    }
+
+    @After
+    public void stopRemoteListeners() {
+        for (UUID uuid : LISTENERS) {
+            ignite().events().stopRemoteListen(uuid);
+        }
+        LISTENERS.clear();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteEventsTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteEventsTest.java
 
b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteEventsTest.java
new file mode 100644
index 0000000..316eacd
--- /dev/null
+++ 
b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteEventsTest.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.ignite;
+
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Route;
+import org.apache.camel.ServiceStatus;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
+import org.junit.After;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assert_;
+
+public class IgniteEventsTest extends AbstractIgniteTest {
+
+    @Test
+    public void testConsumeAllEvents() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("ignite:events:abc").to("mock:test1");
+            }
+        });
+
+        getMockEndpoint("mock:test1").expectedMinimumMessageCount(9);
+
+        IgniteCache<String, String> cache = ignite().getOrCreateCache("abc");
+
+        // Generate cache activity.
+        cache.put("abc", "123");
+        cache.get("abc");
+        cache.remove("abc");
+        cache.withExpiryPolicy(CreatedExpiryPolicy.factoryOf(new 
Duration(TimeUnit.MILLISECONDS, 100)).create()).put("abc", "123");
+
+        Thread.sleep(150);
+
+        cache.get("abc");
+
+        assertMockEndpointsSatisfied();
+
+        List<Integer> eventTypes = receivedEventTypes("mock:test1");
+
+        assert_().that(eventTypes).containsAllOf(EventType.EVT_CACHE_STARTED, 
EventType.EVT_CACHE_ENTRY_CREATED, EventType.EVT_CACHE_OBJECT_PUT, 
EventType.EVT_CACHE_OBJECT_READ,
+                EventType.EVT_CACHE_OBJECT_REMOVED, 
EventType.EVT_CACHE_OBJECT_PUT, EventType.EVT_CACHE_OBJECT_EXPIRED, 
EventType.EVT_CACHE_OBJECT_READ).inOrder();
+
+    }
+
+    @Test
+    public void testConsumeFilteredEventsWithRef() throws Exception {
+        context.getRegistry(JndiRegistry.class).bind("filter", 
Sets.newHashSet(EventType.EVT_CACHE_OBJECT_PUT));
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("ignite:events:abc?events=#filter").to("mock:test2");
+            }
+        });
+
+        getMockEndpoint("mock:test2").expectedMessageCount(2);
+
+        IgniteCache<String, String> cache = ignite().getOrCreateCache("abc");
+
+        // Generate cache activity.
+        cache.put("abc", "123");
+        cache.get("abc");
+        cache.remove("abc");
+        cache.get("abc");
+        cache.put("abc", "123");
+
+        assertMockEndpointsSatisfied();
+
+        List<Integer> eventTypes = receivedEventTypes("mock:test2");
+
+        
assert_().that(eventTypes).containsExactly(EventType.EVT_CACHE_OBJECT_PUT, 
EventType.EVT_CACHE_OBJECT_PUT).inOrder();
+    }
+
+    @Test
+    public void testConsumeFilteredEventsInline() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                
from("ignite:events:abc?events=EVT_CACHE_OBJECT_PUT").to("mock:test3");
+            }
+        });
+
+        getMockEndpoint("mock:test3").expectedMessageCount(2);
+
+        IgniteCache<String, String> cache = ignite().getOrCreateCache("abc");
+
+        // Generate cache activity.
+        cache.put("abc", "123");
+        cache.get("abc");
+        cache.remove("abc");
+        cache.get("abc");
+        cache.put("abc", "123");
+
+        assertMockEndpointsSatisfied();
+
+        List<Integer> eventTypes = receivedEventTypes("mock:test3");
+
+        
assert_().that(eventTypes).containsExactly(EventType.EVT_CACHE_OBJECT_PUT, 
EventType.EVT_CACHE_OBJECT_PUT).inOrder();
+
+    }
+
+    private List<Integer> receivedEventTypes(String mockEndpoint) {
+        List<Integer> eventTypes = 
Lists.newArrayList(Lists.transform(getMockEndpoint(mockEndpoint).getExchanges(),
 new Function<Exchange, Integer>() {
+            @Override
+            public Integer apply(Exchange input) {
+                return input.getIn().getBody(Event.class).type();
+            }
+        }));
+        return eventTypes;
+    }
+
+    @Override
+    public boolean isCreateCamelContextPerClass() {
+        return true;
+    }
+
+    @After
+    public void stopAllRoutes() throws Exception {
+        for (Route route : context.getRoutes()) {
+            if (context.getRouteStatus(route.getId()) != 
ServiceStatus.Started) {
+                return;
+            }
+            context.stopRoute(route.getId());
+        }
+        resetMocks();
+    }
+
+    @Override
+    protected IgniteComponent buildComponent() {
+        IgniteConfiguration config = new IgniteConfiguration();
+        config.setIncludeEventTypes(EventType.EVTS_ALL_MINUS_METRIC_UPDATE);
+        return IgniteComponent.fromConfiguration(config);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteIdGenTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteIdGenTest.java
 
b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteIdGenTest.java
new file mode 100644
index 0000000..c310e41
--- /dev/null
+++ 
b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteIdGenTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.ignite;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.camel.component.ignite.idgen.IgniteIdGenEndpoint;
+import org.apache.camel.component.ignite.idgen.IgniteIdGenOperation;
+import org.apache.ignite.IgniteAtomicSequence;
+import org.junit.After;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assert_;
+
+public class IgniteIdGenTest extends AbstractIgniteTest {
+
+    @Test
+    public void testOperations() {
+        
assert_().that(template.requestBody("ignite:idgen:abc?initialValue=0&operation=GET",
 null, Long.class)).isEqualTo(0);
+        
assert_().that(template.requestBody("ignite:idgen:abc?initialValue=0&operation=GET_AND_INCREMENT",
 null, Long.class)).isEqualTo(0);
+        
assert_().that(template.requestBody("ignite:idgen:abc?initialValue=0&operation=INCREMENT_AND_GET",
 null, Long.class)).isEqualTo(2);
+        
assert_().that(template.requestBody("ignite:idgen:abc?initialValue=0&operation=ADD_AND_GET",
 5, Long.class)).isEqualTo(7);
+        
assert_().that(template.requestBody("ignite:idgen:abc?initialValue=0&operation=GET_AND_ADD",
 5, Long.class)).isEqualTo(7);
+        
assert_().that(template.requestBody("ignite:idgen:abc?initialValue=0&operation=GET",
 5, Long.class)).isEqualTo(12);
+    }
+
+    @Test
+    public void testInitialValue() {
+        
assert_().that(template.requestBody("ignite:idgen:abc?operation=GET&initialValue=100",
 null, Long.class)).isEqualTo(100);
+        
assert_().that(template.requestBody("ignite:idgen:abc?operation=GET_AND_INCREMENT&initialValue=100",
 null, Long.class)).isEqualTo(100);
+        
assert_().that(template.requestBody("ignite:idgen:abc?operation=INCREMENT_AND_GET&initialValue=100",
 null, Long.class)).isEqualTo(102);
+        
assert_().that(template.requestBody("ignite:idgen:abc?operation=ADD_AND_GET&initialValue=100",
 5, Long.class)).isEqualTo(107);
+        
assert_().that(template.requestBody("ignite:idgen:abc?operation=GET_AND_ADD&initialValue=100",
 5, Long.class)).isEqualTo(107);
+        
assert_().that(template.requestBody("ignite:idgen:abc?operation=GET&initialValue=100",
 5, Long.class)).isEqualTo(112);
+    }
+
+    @Test
+    public void testDifferentOperation() {
+        
assert_().that(template.requestBody("ignite:idgen:abc?operation=GET&initialValue=100",
 null, Long.class)).isEqualTo(100);
+        
assert_().that(template.requestBodyAndHeader("ignite:idgen:abc?operation=GET_AND_INCREMENT&initialValue=100",
 null, IgniteConstants.IGNITE_IDGEN_OPERATION,
+                IgniteIdGenOperation.INCREMENT_AND_GET, 
Long.class)).isEqualTo(101);
+    }
+
+    @Test
+    public void testBatchSize() {
+        IgniteIdGenEndpoint endpoint = 
context.getEndpoint("ignite:idgen:abc?operation=GET&initialValue=100&batchSize=100",
 IgniteIdGenEndpoint.class);
+        assert_().that(template.requestBody(endpoint, null, 
Long.class)).isEqualTo(100);
+
+        // Cannot test much here with a single Ignite instance, let's just 
test that the parameter could be set.
+        assert_().that(endpoint.getBatchSize());
+    }
+
+    @Override
+    public boolean isCreateCamelContextPerClass() {
+        return true;
+    }
+
+    @After
+    public void deleteSets() {
+        for (String name : ImmutableSet.<String> of("abc")) {
+            IgniteAtomicSequence seq = ignite().atomicSequence(name, 0, false);
+            if (seq == null) {
+                continue;
+            }
+            seq.close();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteMessagingTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteMessagingTest.java
 
b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteMessagingTest.java
new file mode 100644
index 0000000..a1cee04
--- /dev/null
+++ 
b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteMessagingTest.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.ignite;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ContiguousSet;
+import com.google.common.collect.DiscreteDomain;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Range;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.junit.After;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assert_;
+import static com.jayway.awaitility.Awaitility.await;
+import static com.jayway.awaitility.Awaitility.to;
+import static org.hamcrest.Matchers.equalTo;
+
+public class IgniteMessagingTest extends AbstractIgniteTest implements 
Serializable {
+
+    private static final long serialVersionUID = 3967738538216977749L;
+
+    private static final String TOPIC1 = "TOPIC1";
+    private static final String TOPIC2 = "TOPIC2";
+    private UUID uuid;
+
+    @Test
+    public void testProducerSendMessage() {
+        List<Object> messages = Lists.newArrayList();
+        setupMessageListener(TOPIC1, messages);
+
+        template.requestBody("ignite:messaging:TOPIC1", 1);
+
+        await().atMost(5, TimeUnit.SECONDS).untilCall(to(messages).size(), 
equalTo(1));
+        assert_().that(messages.get(0)).isEqualTo(1);
+    }
+
+    @Test
+    public void testProducerSendMessageTopicInHeader() throws Exception {
+        List<Object> messages1 = Lists.newArrayList();
+        setupMessageListener(TOPIC1, messages1);
+
+        List<Object> messages2 = Lists.newArrayList();
+        setupMessageListener(TOPIC2, messages2);
+
+        template.requestBodyAndHeader("ignite:messaging:TOPIC1", 1, 
IgniteConstants.IGNITE_MESSAGING_TOPIC, "TOPIC2");
+
+        Thread.sleep(1000);
+        assert_().that(messages1.size()).isEqualTo(0);
+        assert_().that(messages2.size()).isEqualTo(1);
+    }
+
+    @Test
+    public void testProducerSendManyMessages() {
+        List<Object> messages = Lists.newArrayList();
+        setupMessageListener(TOPIC1, messages);
+
+        Set<Integer> request = ContiguousSet.create(Range.closedOpen(0, 100), 
DiscreteDomain.integers());
+        template.requestBody("ignite:messaging:TOPIC1", request);
+
+        await().atMost(5, TimeUnit.SECONDS).untilCall(to(messages).size(), 
equalTo(100));
+        assert_().that(messages).containsAllIn(request);
+    }
+
+    @Test
+    public void testProducerSendManyMessagesOrdered() {
+        List<Object> messages = Lists.newArrayList();
+        setupMessageListener(TOPIC1, messages);
+
+        ContiguousSet<Integer> set = ContiguousSet.create(Range.closedOpen(0, 
100), DiscreteDomain.integers());
+        for (int i : set) {
+            
template.requestBody("ignite:messaging:TOPIC1?sendMode=ORDERED&timeout=1000", 
i);
+        }
+
+        await().atMost(5, TimeUnit.SECONDS).untilCall(to(messages).size(), 
equalTo(100));
+        assert_().that(messages).containsAllIn(set);
+    }
+
+    @Test
+    public void testProducerSendCollectionAsObject() {
+        List<Object> messages = Lists.newArrayList();
+        setupMessageListener(TOPIC1, messages);
+
+        Set<Integer> request = ContiguousSet.create(Range.closedOpen(0, 100), 
DiscreteDomain.integers());
+        
template.requestBody("ignite:messaging:TOPIC1?treatCollectionsAsCacheObjects=true",
 request);
+
+        await().atMost(5, TimeUnit.SECONDS).untilCall(to(messages).size(), 
equalTo(1));
+        assert_().that(messages.get(0)).isEqualTo(request);
+    }
+
+    @Test
+    public void testConsumerManyMessages() throws Exception {
+        List<Object> messages = Lists.newArrayList();
+        Consumer consumer = 
context.getEndpoint("ignite:messaging:TOPIC1").createConsumer(storeBodyInListProcessor(messages));
+        consumer.start();
+
+        Set<Integer> messagesToSend = ContiguousSet.create(Range.closedOpen(0, 
100), DiscreteDomain.integers());
+        ignite().message().send(TOPIC1, messagesToSend);
+
+        await().atMost(5, TimeUnit.SECONDS).untilCall(to(messages).size(), 
equalTo(100));
+
+        consumer.stop();
+    }
+
+    private void setupMessageListener(String topic, final List<Object> 
messages) {
+        uuid = ignite().message().remoteListen(topic, new 
IgniteBiPredicate<UUID, Object>() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public boolean apply(UUID uuid, Object message) {
+                messages.add(message);
+                return true;
+            }
+        });
+    }
+
+    @After
+    public void stopMessageListener() {
+        if (uuid == null) {
+            return;
+        }
+
+        ignite().message().stopRemoteListen(uuid);
+        uuid = null;
+    }
+
+    @Override
+    public boolean isCreateCamelContextPerClass() {
+        return true;
+    }
+
+    private Processor storeBodyInListProcessor(final List<Object> list) {
+        return new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                list.add(exchange.getIn().getBody());
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteQueueTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteQueueTest.java
 
b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteQueueTest.java
new file mode 100644
index 0000000..77fb769
--- /dev/null
+++ 
b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteQueueTest.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.ignite;
+
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import org.apache.camel.component.ignite.queue.IgniteQueueEndpoint;
+import org.apache.camel.component.ignite.queue.IgniteQueueOperation;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CollectionConfiguration;
+import org.junit.After;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assert_;
+
+
+public class IgniteQueueTest extends AbstractIgniteTest {
+
+    @Test
+    public void testOperations() {
+        boolean result = 
template.requestBody("ignite:queue:abc?operation=ADD", "hello", boolean.class);
+        assert_().that(result).isTrue();
+        assert_().that(ignite().queue("abc", 0, new 
CollectionConfiguration()).contains("hello")).isTrue();
+
+        result = template.requestBody("ignite:queue:abc?operation=CONTAINS", 
"hello", boolean.class);
+        assert_().that(result).isTrue();
+        assert_().that(ignite().queue("abc", 0, new 
CollectionConfiguration()).contains("hello")).isTrue();
+
+        result = template.requestBody("ignite:queue:abc?operation=REMOVE", 
"hello", boolean.class);
+        assert_().that(result).isTrue();
+        assert_().that(ignite().queue("abc", 0, new 
CollectionConfiguration()).contains("hello")).isFalse();
+
+        result = template.requestBody("ignite:queue:abc?operation=CONTAINS", 
"hello", boolean.class);
+        assert_().that(result).isFalse();
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testOperations2() {
+        for (int i = 0; i < 100; i++) {
+            template.requestBody("ignite:queue:abc?operation=ADD", "hello" + 
i);
+        }
+
+        // SIZE
+        int size = template.requestBody("ignite:queue:abc?operation=SIZE", 
"hello", int.class);
+        assert_().that(size).isEqualTo(100);
+        assert_().that(ignite().queue("abc", 0, new 
CollectionConfiguration()).size()).isEqualTo(100);
+
+        List<String> toRetain = Lists.newArrayList();
+        for (int i = 0; i < 50; i++) {
+            toRetain.add("hello" + i);
+        }
+
+        // RETAIN_ALL
+        boolean retained = 
template.requestBodyAndHeader("ignite:queue:abc?operation=CLEAR", toRetain, 
IgniteConstants.IGNITE_QUEUE_OPERATION, IgniteQueueOperation.RETAIN_ALL, 
boolean.class);
+        assert_().that(retained).isTrue();
+
+        // SIZE
+        size = template.requestBody("ignite:queue:abc?operation=SIZE", 
"hello", int.class);
+        assert_().that(size).isEqualTo(50);
+        assert_().that(ignite().queue("abc", 0, new 
CollectionConfiguration()).size()).isEqualTo(50);
+
+        // ITERATOR
+        Iterator<String> iterator = 
template.requestBody("ignite:queue:abc?operation=ITERATOR", "hello", 
Iterator.class);
+        assert_().that(Iterators.toArray(iterator, 
String.class)).asList().containsExactlyElementsIn(toRetain).inOrder();
+
+        // ARRAY
+        String[] array = 
template.requestBody("ignite:queue:abc?operation=ARRAY", "hello", 
String[].class);
+        
assert_().that(array).asList().containsExactlyElementsIn(toRetain).inOrder();
+
+        // CLEAR
+        Object result = 
template.requestBody("ignite:queue:abc?operation=CLEAR", "hello", String.class);
+        assert_().that(result).isEqualTo("hello");
+        assert_().that(ignite().queue("abc", 0, new 
CollectionConfiguration()).size()).isEqualTo(0);
+
+        // SIZE
+        size = template.requestBody("ignite:queue:abc?operation=SIZE", 
"hello", int.class);
+        assert_().that(size).isEqualTo(0);
+        assert_().that(ignite().queue("abc", 0, new 
CollectionConfiguration()).size()).isEqualTo(0);
+    }
+
+    @Test
+    public void testRetainSingle() {
+        // Fill data.
+        for (int i = 0; i < 100; i++) {
+            template.requestBody("ignite:queue:abc?operation=ADD", "hello" + 
i);
+        }
+
+        boolean retained = 
template.requestBody("ignite:queue:abc?operation=RETAIN_ALL", "hello10", 
boolean.class);
+        assert_().that(retained).isTrue();
+
+        // ARRAY
+        String[] array = 
template.requestBody("ignite:queue:abc?operation=ARRAY", "hello", 
String[].class);
+        assert_().that(array).asList().containsExactly("hello10");
+    }
+
+    @Test
+    public void testCollectionsAsCacheObject() {
+        // Fill data.
+        for (int i = 0; i < 100; i++) {
+            template.requestBody("ignite:queue:abc?operation=ADD", "hello" + 
i);
+        }
+
+        // Add the set.
+        Set<String> toAdd = Sets.newHashSet("hello101", "hello102", 
"hello103");
+        
template.requestBody("ignite:queue:abc?operation=ADD&treatCollectionsAsCacheObjects=true",
 toAdd);
+
+        // Size must be 101, not 103.
+        int size = template.requestBody("ignite:queue:abc?operation=SIZE", 
"hello", int.class);
+        assert_().that(size).isEqualTo(101);
+        assert_().that(ignite().queue("abc", 0, new 
CollectionConfiguration()).size()).isEqualTo(101);
+        assert_().that(ignite().queue("abc", 0, new 
CollectionConfiguration()).contains(toAdd)).isTrue();
+
+        // Check whether the Set contains the Set.
+        boolean contains = 
template.requestBody("ignite:queue:abc?operation=CONTAINS&treatCollectionsAsCacheObjects=true",
 toAdd, boolean.class);
+        assert_().that(contains).isTrue();
+
+        // Delete the Set.
+        
template.requestBody("ignite:queue:abc?operation=REMOVE&treatCollectionsAsCacheObjects=true",
 toAdd);
+
+        // Size must be 100 again.
+        size = template.requestBody("ignite:queue:abc?operation=SIZE", 
"hello", int.class);
+        assert_().that(size).isEqualTo(100);
+        assert_().that(ignite().queue("abc", 0, new 
CollectionConfiguration()).size()).isEqualTo(100);
+        assert_().that(ignite().queue("abc", 0, new 
CollectionConfiguration()).contains(toAdd)).isFalse();
+
+    }
+
+    @Test
+    public void testWithConfiguration() {
+        CollectionConfiguration configuration = new CollectionConfiguration();
+        configuration.setCacheMode(CacheMode.LOCAL);
+
+        context.getRegistry(JndiRegistry.class).bind("config", configuration);
+
+        IgniteQueueEndpoint igniteEndpoint = 
context.getEndpoint("ignite:queue:abc?operation=ADD&configuration=#config", 
IgniteQueueEndpoint.class);
+        template.requestBody(igniteEndpoint, "hello");
+
+        assert_().that(ignite().queue("abc", 0, 
configuration).size()).isEqualTo(1);
+        
assert_().that(igniteEndpoint.getConfiguration()).isEqualTo(configuration);
+    }
+
+    @Test
+    public void testBoundedQueueAndOtherOperations() throws Exception {
+        List<String> list = Lists.newArrayList();
+
+        // Fill data.
+        for (int i = 0; i < 100; i++) {
+            
template.requestBody("ignite:queue:def?operation=ADD&capacity=100", "hello" + 
i);
+            list.add("hello" + i);
+        }
+
+        // NOTE: Unfortunately the behaviour of IgniteQueue doesn't adhere to 
the overridden ADD method. It should return an Exception.
+        
assert_().that(template.requestBody("ignite:queue:def?operation=ADD&capacity=100",
 "hello101", boolean.class)).isFalse();
+        
assert_().that(template.requestBody("ignite:queue:def?operation=OFFER&capacity=100",
 "hello101", boolean.class)).isFalse();
+
+        final CountDownLatch latch = new CountDownLatch(1);
+        Thread t = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                
assert_().that(template.requestBody("ignite:queue:def?operation=PUT&capacity=100",
 "hello101", boolean.class)).isFalse();
+                latch.countDown();
+            }
+        });
+
+        t.start();
+
+        // Wait 2 seconds and check that the thread was blocked.
+        assert_().that(latch.await(2000, TimeUnit.MILLISECONDS)).isFalse();
+        t.interrupt();
+
+        // PEEK and ELEMENT.
+        
assert_().that(template.requestBody("ignite:queue:def?operation=PEEK&capacity=100",
 null, String.class)).isEqualTo("hello0");
+        
assert_().that(template.requestBody("ignite:queue:def?operation=ELEMENT&capacity=100",
 null, String.class)).isEqualTo("hello0");
+
+        // TAKE.
+        
assert_().that(template.requestBody("ignite:queue:def?operation=TAKE&capacity=100",
 null, String.class)).isEqualTo("hello0");
+        
assert_().that(template.requestBody("ignite:queue:def?operation=SIZE&capacity=100",
 null, int.class)).isEqualTo(99);
+
+        // Now drain.
+        
assert_().that(template.requestBody("ignite:queue:def?operation=DRAIN&capacity=100",
 null, String[].class)).asList().hasSize(99);
+        
assert_().that(template.requestBody("ignite:queue:def?operation=SIZE&capacity=100",
 null, int.class)).isEqualTo(0);
+        
assert_().that(template.requestBody("ignite:queue:def?operation=POLL&capacity=100",
 null, String.class)).isNull();
+
+        // TAKE.
+        t = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                
assert_().that(template.requestBody("ignite:queue:def?operation=TAKE&capacity=100",
 null, String.class)).isEqualTo("hello102");
+                latch.countDown();
+            }
+        });
+
+        t.start();
+
+        // Element was returned.
+        
assert_().that(template.requestBody("ignite:queue:def?operation=ADD&capacity=100",
 "hello102", boolean.class)).isTrue();
+        assert_().that(latch.await(1000, TimeUnit.MILLISECONDS)).isTrue();
+
+        // POLL with a timeout.
+        assert_().that(Executors.newSingleThreadExecutor().submit(new 
Callable<Long>() {
+            @Override
+            public Long call() throws Exception {
+                Stopwatch sw = Stopwatch.createStarted();
+                
assert_().that(template.requestBody("ignite:queue:def?operation=POLL&timeoutMillis=1000&capacity=100",
 null, String.class)).isNull();
+                return sw.elapsed(TimeUnit.MILLISECONDS);
+            }
+        }).get()).isAtLeast(1000L);
+
+    }
+
+    @Override
+    public boolean isCreateCamelContextPerClass() {
+        return true;
+    }
+
+    @After
+    public void deleteQueues() {
+        for (String queueName : ImmutableSet.<String> of("abc")) {
+            ignite().queue(queueName, 0, new 
CollectionConfiguration()).close();
+        }
+
+        // Bounded queues.
+        for (String queueName : ImmutableSet.<String> of("def")) {
+            ignite().queue(queueName, 100, new 
CollectionConfiguration()).close();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteSetTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteSetTest.java
 
b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteSetTest.java
new file mode 100644
index 0000000..3c3c627
--- /dev/null
+++ 
b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteSetTest.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.ignite;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import org.apache.camel.component.ignite.set.IgniteSetEndpoint;
+import org.apache.camel.component.ignite.set.IgniteSetOperation;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CollectionConfiguration;
+import org.junit.After;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assert_;
+
+public class IgniteSetTest extends AbstractIgniteTest {
+
+    @Test
+    public void testOperations() {
+        boolean result = template.requestBody("ignite:set:abc?operation=ADD", 
"hello", boolean.class);
+        assert_().that(result).isTrue();
+        assert_().that(ignite().set("abc", new 
CollectionConfiguration()).contains("hello")).isTrue();
+
+        result = template.requestBody("ignite:set:abc?operation=CONTAINS", 
"hello", boolean.class);
+        assert_().that(result).isTrue();
+        assert_().that(ignite().set("abc", new 
CollectionConfiguration()).contains("hello")).isTrue();
+
+        result = template.requestBody("ignite:set:abc?operation=REMOVE", 
"hello", boolean.class);
+        assert_().that(result).isTrue();
+        assert_().that(ignite().set("abc", new 
CollectionConfiguration()).contains("hello")).isFalse();
+
+        result = template.requestBody("ignite:set:abc?operation=CONTAINS", 
"hello", boolean.class);
+        assert_().that(result).isFalse();
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testOperations2() {
+        for (int i = 0; i < 100; i++) {
+            template.requestBody("ignite:set:abc?operation=ADD", "hello" + i);
+        }
+
+        // SIZE
+        int size = template.requestBody("ignite:set:abc?operation=SIZE", 
"hello", int.class);
+        assert_().that(size).isEqualTo(100);
+        assert_().that(ignite().set("abc", new 
CollectionConfiguration()).size()).isEqualTo(100);
+
+        List<String> toRetain = Lists.newArrayList();
+        for (int i = 0; i < 50; i++) {
+            toRetain.add("hello" + i);
+        }
+
+        // RETAIN_ALL
+        boolean retained = 
template.requestBodyAndHeader("ignite:set:abc?operation=CLEAR", toRetain, 
IgniteConstants.IGNITE_SETS_OPERATION, IgniteSetOperation.RETAIN_ALL, 
boolean.class);
+        assert_().that(retained).isTrue();
+
+        // SIZE
+        size = template.requestBody("ignite:set:abc?operation=SIZE", "hello", 
int.class);
+        assert_().that(size).isEqualTo(50);
+        assert_().that(ignite().set("abc", new 
CollectionConfiguration()).size()).isEqualTo(50);
+
+        // ITERATOR
+        Iterator<String> iterator = 
template.requestBody("ignite:set:abc?operation=ITERATOR", "hello", 
Iterator.class);
+        assert_().that(Iterators.toArray(iterator, 
String.class)).asList().containsExactlyElementsIn(toRetain);
+
+        // ARRAY
+        String[] array = 
template.requestBody("ignite:set:abc?operation=ARRAY", "hello", String[].class);
+        assert_().that(array).asList().containsExactlyElementsIn(toRetain);
+
+        // CLEAR
+        Object result = template.requestBody("ignite:set:abc?operation=CLEAR", 
"hello", String.class);
+        assert_().that(result).isEqualTo("hello");
+        assert_().that(ignite().set("abc", new 
CollectionConfiguration()).size()).isEqualTo(0);
+
+        // SIZE
+        size = template.requestBody("ignite:set:abc?operation=SIZE", "hello", 
int.class);
+        assert_().that(size).isEqualTo(0);
+        assert_().that(ignite().set("abc", new 
CollectionConfiguration()).size()).isEqualTo(0);
+    }
+
+    @Test
+    public void testRetainSingle() {
+        // Fill data.
+        for (int i = 0; i < 100; i++) {
+            template.requestBody("ignite:set:abc?operation=ADD", "hello" + i);
+        }
+
+        boolean retained = 
template.requestBody("ignite:set:abc?operation=RETAIN_ALL", "hello10", 
boolean.class);
+        assert_().that(retained).isTrue();
+
+        // ARRAY
+        String[] array = 
template.requestBody("ignite:set:abc?operation=ARRAY", "hello", String[].class);
+        assert_().that(array).asList().containsExactly("hello10");
+    }
+
+    @Test
+    public void testCollectionsAsCacheObject() {
+        // Fill data.
+        for (int i = 0; i < 100; i++) {
+            template.requestBody("ignite:set:abc?operation=ADD", "hello" + i);
+        }
+
+        // Add the set.
+        Set<String> toAdd = Sets.newHashSet("hello101", "hello102", 
"hello103");
+        
template.requestBody("ignite:set:abc?operation=ADD&treatCollectionsAsCacheObjects=true",
 toAdd);
+
+        // Size must be 101, not 103.
+        int size = template.requestBody("ignite:set:abc?operation=SIZE", 
"hello", int.class);
+        assert_().that(size).isEqualTo(101);
+        assert_().that(ignite().set("abc", new 
CollectionConfiguration()).size()).isEqualTo(101);
+        assert_().that(ignite().set("abc", new 
CollectionConfiguration()).contains(toAdd)).isTrue();
+
+        // Check whether the Set contains the Set.
+        boolean contains = 
template.requestBody("ignite:set:abc?operation=CONTAINS&treatCollectionsAsCacheObjects=true",
 toAdd, boolean.class);
+        assert_().that(contains).isTrue();
+
+        // Delete the Set.
+        
template.requestBody("ignite:set:abc?operation=REMOVE&treatCollectionsAsCacheObjects=true",
 toAdd);
+
+        // Size must be 100 again.
+        size = template.requestBody("ignite:set:abc?operation=SIZE", "hello", 
int.class);
+        assert_().that(size).isEqualTo(100);
+        assert_().that(ignite().set("abc", new 
CollectionConfiguration()).size()).isEqualTo(100);
+        assert_().that(ignite().set("abc", new 
CollectionConfiguration()).contains(toAdd)).isFalse();
+
+    }
+
+    @Test
+    public void testWithConfiguration() {
+        CollectionConfiguration configuration = new CollectionConfiguration();
+        configuration.setCacheMode(CacheMode.LOCAL);
+
+        context.getRegistry(JndiRegistry.class).bind("config", configuration);
+
+        IgniteSetEndpoint igniteEndpoint = 
context.getEndpoint("ignite:set:abc?operation=ADD&configuration=#config", 
IgniteSetEndpoint.class);
+        template.requestBody(igniteEndpoint, "hello");
+
+        assert_().that(ignite().set("abc", configuration).size()).isEqualTo(1);
+        
assert_().that(igniteEndpoint.getConfiguration()).isEqualTo(configuration);
+    }
+
+    @Override
+    public boolean isCreateCamelContextPerClass() {
+        return true;
+    }
+
+    @After
+    public void deleteSets() {
+        for (String setName : ImmutableSet.<String> of("abc")) {
+            ignite().set(setName, new CollectionConfiguration()).close();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/TestIgniteComputeResources.java
----------------------------------------------------------------------
diff --git 
a/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/TestIgniteComputeResources.java
 
b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/TestIgniteComputeResources.java
new file mode 100644
index 0000000..7c31e16e
--- /dev/null
+++ 
b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/TestIgniteComputeResources.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.ignite;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeTask;
+import org.apache.ignite.compute.ComputeTaskSplitAdapter;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteReducer;
+import org.apache.ignite.lang.IgniteRunnable;
+
+public final class TestIgniteComputeResources {
+
+    public static final AtomicInteger COUNTER = new AtomicInteger(0);
+
+    public static final IgniteRunnable TEST_RUNNABLE = new IgniteRunnable() {
+        private static final long serialVersionUID = -4961602602993218883L;
+
+        @Override
+        public void run() {
+            System.out.println("Hello from a runnable");
+        }
+    };
+
+    public static final IgniteRunnable TEST_RUNNABLE_COUNTER = new 
IgniteRunnable() {
+        private static final long serialVersionUID = 386219709871673366L;
+
+        @Override
+        public void run() {
+            COUNTER.incrementAndGet();
+        }
+    };
+
+    public static final IgnitePredicate<Event> EVENT_COUNTER = new 
IgnitePredicate<Event>() {
+        private static final long serialVersionUID = -4214894278107593791L;
+
+        @Override
+        public boolean apply(Event event) {
+            COUNTER.incrementAndGet();
+            return true;
+        }
+    };
+
+    public static final IgniteCallable<String> TEST_CALLABLE = new 
IgniteCallable<String>() {
+        private static final long serialVersionUID = 986972344531961815L;
+
+        @Override
+        public String call() throws Exception {
+            return "hello";
+        }
+    };
+
+    public static final IgniteClosure<String, String> TEST_CLOSURE = new 
IgniteClosure<String, String>() {
+        private static final long serialVersionUID = -3969758431961263815L;
+
+        @Override
+        public String apply(String input) {
+            return "hello " + input;
+        }
+    };
+
+    public static final ComputeTask<Integer, String> COMPUTE_TASK = new 
ComputeTaskSplitAdapter<Integer, String>() {
+        private static final long serialVersionUID = 3040624379256407732L;
+
+        @Override
+        public String reduce(List<ComputeJobResult> results) throws 
IgniteException {
+            StringBuilder answer = new StringBuilder();
+            for (ComputeJobResult res : results) {
+                answer.append(res.getData()).append(",");
+            }
+            answer.deleteCharAt(answer.length() - 1);
+            return answer.toString();
+        }
+
+        @Override
+        protected Collection<? extends ComputeJob> split(int gridSize, final 
Integer arg) throws IgniteException {
+            Set<ComputeJob> answer = new HashSet<>();
+            for (int i = 0; i < arg; i++) {
+                final int c = i;
+                answer.add(new ComputeJob() {
+                    private static final long serialVersionUID = 
3365213549618276779L;
+
+                    @Override
+                    public Object execute() throws IgniteException {
+                        return "a" + c;
+                    }
+
+                    @Override
+                    public void cancel() {
+                        // nothing
+                    }
+                });
+            }
+            return answer;
+        }
+    };
+
+    public static final IgniteReducer<String, String> STRING_JOIN_REDUCER = 
new IgniteReducer<String, String>() {
+        private static final long serialVersionUID = 1L;
+        private List<String> list = Lists.newArrayList();
+
+        @Override
+        public boolean collect(String value) {
+            list.add(value);
+            return true;
+        }
+
+        @Override
+        public String reduce() {
+            Collections.sort(list);
+            String answer = Joiner.on("").join(list);
+            list.clear();
+            return answer;
+        }
+    };
+    
+    private TestIgniteComputeResources() {
+        
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/components/camel-ignite/src/test/resources/log4j.properties 
b/components/camel-ignite/src/test/resources/log4j.properties
new file mode 100644
index 0000000..cb64298
--- /dev/null
+++ b/components/camel-ignite/src/test/resources/log4j.properties
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+log4j.rootLogger=INFO, file
+# change the logging level of this category to increase verbosity of the 
MongoDB component
+log4j.category.org.apache.camel.component.mongodb=INFO, file
+log4j.additivity.org.apache.camel.component.mongodb=false
+
+# uncomment the following line to turn on Camel debugging
+#log4j.logger.org.apache.camel=DEBUG
+
+# CONSOLE appender not used by default
+log4j.appender.out=org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout=org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern=[%30.30t] %-30.30c{1} %-5p %m%n
+#log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - 
%m%n
+
+
+# File appender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - 
%m%n
+log4j.appender.file.file=target/camel-mongodb-test.log

http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/pom.xml
----------------------------------------------------------------------
diff --git a/components/pom.xml b/components/pom.xml
index fe40980..a68a15e 100644
--- a/components/pom.xml
+++ b/components/pom.xml
@@ -124,6 +124,7 @@
     <module>camel-hl7</module>
     <module>camel-ibatis</module>
     <module>camel-ical</module>
+    <module>camel-ignite</module>
     <module>camel-infinispan</module>
     <module>camel-irc</module>
     <module>camel-jackson</module>

http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index f2e3db2..2839035 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -233,6 +233,7 @@
     <ibatis-bundle-version>2.3.4.726_4</ibatis-bundle-version>
     <ibatis-version>2.3.4.726</ibatis-version>
     <ical4j-version>1.0.7</ical4j-version>
+    <ignite-version>1.5.0-b1</ignite-version>
     <infinispan-version>8.1.0.Final</infinispan-version>
     <irclib-bundle-version>1.10_5</irclib-bundle-version>
     <irclib-version>1.10</irclib-version>

Reply via email to