CAMEL-6372: Added support for maxMessagesPerPoll on camel-krati.
Conflicts:
components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiEndpoint.java
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/46a55b30
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/46a55b30
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/46a55b30
Branch: refs/heads/camel-2.10.x
Commit: 46a55b302ded54e3ede7a9a915ebc7fb3a9b72e4
Parents: f8087e2
Author: Claus Ibsen <[email protected]>
Authored: Fri May 17 09:13:15 2013 +0200
Committer: Claus Ibsen <[email protected]>
Committed: Fri May 17 09:16:32 2013 +0200
----------------------------------------------------------------------
.../camel/component/krati/KratiConsumer.java | 12 ++-
.../camel/component/krati/KratiEndpoint.java | 17 +++-
.../krati/KratiConsumerMaxMessagesPerPollTest.java | 65 +++++++++++++++
.../camel/component/krati/KratiConsumerTest.java | 9 +-
.../camel/component/krati/KratiEndpointTest.java | 2 -
.../component/krati/KratiProducerSpringTest.java | 28 +++----
.../camel/component/krati/KratiProducerTest.java | 27 +++----
7 files changed, 118 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/46a55b30/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java
----------------------------------------------------------------------
diff --git
a/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java
b/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java
index 4fea54c..24694c1 100644
---
a/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java
+++
b/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java
@@ -30,7 +30,6 @@ import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* The Krati consumer.
*/
@@ -51,18 +50,27 @@ public class KratiConsumer extends
ScheduledBatchPollingConsumer {
protected int poll() throws Exception {
shutdownRunningTask = null;
pendingExchanges = 0;
+ int max = getMaxMessagesPerPoll() > 0 ? getMaxMessagesPerPoll() :
Integer.MAX_VALUE;
Queue<Exchange> queue = new LinkedList<Exchange>();
Iterator<Object> keyIterator = dataStore.keyIterator();
- while (keyIterator.hasNext()) {
+ int index = 0;
+ while (keyIterator.hasNext() && index < max) {
Object key = keyIterator.next();
Object value = dataStore.get(key);
Exchange exchange = endpoint.createExchange();
exchange.setProperty(KratiConstants.KEY, key);
exchange.getIn().setBody(value);
queue.add(exchange);
+ index++;
+ }
+
+ // did we cap at max?
+ if (index == max && keyIterator.hasNext()) {
+ log.debug("Limiting to maximum messages to poll {} as there was
more messages in this poll.", max);
}
+
return queue.isEmpty() ? 0 : processBatch(CastUtils.cast(queue));
}
http://git-wip-us.apache.org/repos/asf/camel/blob/46a55b30/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiEndpoint.java
----------------------------------------------------------------------
diff --git
a/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiEndpoint.java
b/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiEndpoint.java
index b419445..435086b 100644
---
a/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiEndpoint.java
+++
b/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiEndpoint.java
@@ -20,6 +20,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
+
import krati.core.segment.ChannelSegmentFactory;
import krati.core.segment.SegmentFactory;
import krati.io.Serializer;
@@ -30,12 +31,12 @@ import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.component.krati.serializer.KratiDefaultSerializer;
-import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.ScheduledPollEndpoint;
/**
* Represents a Krati endpoint.
*/
-public class KratiEndpoint extends DefaultEndpoint {
+public class KratiEndpoint extends ScheduledPollEndpoint {
protected static Map<String, KratiDataStoreRegistration> dataStoreRegistry
= new HashMap<String, KratiDataStoreRegistration>();
@@ -53,6 +54,7 @@ public class KratiEndpoint extends DefaultEndpoint {
protected HashFunction<byte[]> hashFunction = new FnvHashFunction();
protected String path;
+ protected int maxMessagesPerPoll;
public KratiEndpoint(String uri, KratiComponent component) throws
URISyntaxException {
super(uri, component);
@@ -91,7 +93,10 @@ public class KratiEndpoint extends DefaultEndpoint {
dataStore = KratiHelper.createDataStore(path, initialCapacity,
segmentFileSize, segmentFactory, hashFunction, keySerializer, valueSerializer);
dataStoreRegistry.put(path, new
KratiDataStoreRegistration(dataStore));
}
- return new KratiConsumer(this, processor, dataStore);
+ KratiConsumer answer = new KratiConsumer(this, processor, dataStore);
+ answer.setMaxMessagesPerPoll(getMaxMessagesPerPoll());
+ configureConsumer(answer);
+ return answer;
}
public boolean isSingleton() {
@@ -177,5 +182,11 @@ public class KratiEndpoint extends DefaultEndpoint {
return path;
}
+ public int getMaxMessagesPerPoll() {
+ return maxMessagesPerPoll;
+ }
+ public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
+ this.maxMessagesPerPoll = maxMessagesPerPoll;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/46a55b30/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiConsumerMaxMessagesPerPollTest.java
----------------------------------------------------------------------
diff --git
a/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiConsumerMaxMessagesPerPollTest.java
b/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiConsumerMaxMessagesPerPollTest.java
new file mode 100644
index 0000000..3baf1af
--- /dev/null
+++
b/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiConsumerMaxMessagesPerPollTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.krati;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class KratiConsumerMaxMessagesPerPollTest extends CamelTestSupport {
+
+ @Test
+ public void testPutAndConsume() throws InterruptedException {
+ MockEndpoint endpoint = context.getEndpoint("mock:results",
MockEndpoint.class);
+ // batch-1
+ endpoint.message(0).property(Exchange.BATCH_SIZE).isEqualTo(2);
+ endpoint.message(0).property(Exchange.BATCH_INDEX).isEqualTo(0);
+ endpoint.message(0).property(Exchange.BATCH_COMPLETE).isEqualTo(false);
+ endpoint.message(1).property(Exchange.BATCH_SIZE).isEqualTo(2);
+ endpoint.message(1).property(Exchange.BATCH_INDEX).isEqualTo(1);
+ endpoint.message(1).property(Exchange.BATCH_COMPLETE).isEqualTo(true);
+
+ // batch-2
+ endpoint.message(2).property(Exchange.BATCH_SIZE).isEqualTo(1);
+ endpoint.message(2).property(Exchange.BATCH_INDEX).isEqualTo(0);
+ endpoint.message(2).property(Exchange.BATCH_COMPLETE).isEqualTo(true);
+
+ endpoint.expectedMessageCount(3);
+
+ template.sendBodyAndHeader("direct:put", "TEST1", KratiConstants.KEY,
"1");
+ template.sendBodyAndHeader("direct:put", "TEST2", KratiConstants.KEY,
"2");
+ template.sendBodyAndHeader("direct:put", "TEST3", KratiConstants.KEY,
"3");
+
+ endpoint.assertIsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() {
+ from("direct:put")
+ .to("krati:target/test/consumertest");
+
+ from("krati:target/test/consumertest?maxMessagesPerPoll=2")
+ .to("mock:results");
+
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/46a55b30/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiConsumerTest.java
----------------------------------------------------------------------
diff --git
a/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiConsumerTest.java
b/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiConsumerTest.java
index 4be1388..011ae90 100644
---
a/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiConsumerTest.java
+++
b/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiConsumerTest.java
@@ -14,10 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.camel.component.krati;
-import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit4.CamelTestSupport;
@@ -27,12 +25,13 @@ public class KratiConsumerTest extends CamelTestSupport {
@Test
public void testPutAndConsume() throws InterruptedException {
- ProducerTemplate template = context.createProducerTemplate();
+ MockEndpoint endpoint = context.getEndpoint("mock:results",
MockEndpoint.class);
+ endpoint.expectedMessageCount(3);
+
template.sendBodyAndHeader("direct:put", "TEST1", KratiConstants.KEY,
"1");
template.sendBodyAndHeader("direct:put", "TEST2", KratiConstants.KEY,
"2");
template.sendBodyAndHeader("direct:put", "TEST3", KratiConstants.KEY,
"3");
- MockEndpoint endpoint = context.getEndpoint("mock:results",
MockEndpoint.class);
- endpoint.expectedMessageCount(3);
+
endpoint.assertIsSatisfied();
}
http://git-wip-us.apache.org/repos/asf/camel/blob/46a55b30/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiEndpointTest.java
----------------------------------------------------------------------
diff --git
a/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiEndpointTest.java
b/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiEndpointTest.java
index 6f4e320..94bd86c 100644
---
a/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiEndpointTest.java
+++
b/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiEndpointTest.java
@@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.camel.component.krati;
import org.junit.Test;
@@ -32,7 +31,6 @@ public class KratiEndpointTest {
endpoint.start();
endpoint.stop();
assertEquals("target/test/endpointtest", endpoint.getPath());
-
}
@Test
http://git-wip-us.apache.org/repos/asf/camel/blob/46a55b30/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerSpringTest.java
----------------------------------------------------------------------
diff --git
a/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerSpringTest.java
b/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerSpringTest.java
index aa728d4..bfbe3f7 100644
---
a/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerSpringTest.java
+++
b/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerSpringTest.java
@@ -14,10 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.camel.component.krati;
-import org.apache.camel.ProducerTemplate;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit4.CamelSpringTestSupport;
import org.junit.Test;
@@ -27,26 +25,26 @@ import
org.springframework.context.support.ClassPathXmlApplicationContext;
public class KratiProducerSpringTest extends CamelSpringTestSupport {
@Test
- public void testPut() throws InterruptedException {
- ProducerTemplate template = context.createProducerTemplate();
+ public void testPut() throws Exception {
+ MockEndpoint endpoint = context.getEndpoint("mock:results",
MockEndpoint.class);
+ endpoint.expectedMessageCount(3);
+
template.sendBodyAndHeader("direct:put", "TEST1", KratiConstants.KEY,
"1");
template.sendBodyAndHeader("direct:put", "TEST2", KratiConstants.KEY,
"2");
template.sendBodyAndHeader("direct:put", "TEST3", KratiConstants.KEY,
"3");
- MockEndpoint endpoint = context.getEndpoint("mock:results",
MockEndpoint.class);
- endpoint.expectedMessageCount(3);
+
endpoint.assertIsSatisfied();
}
-
@Test
- public void testPutAndGet() throws InterruptedException {
- ProducerTemplate template = context.createProducerTemplate();
+ public void testPutAndGet() throws Exception {
+ MockEndpoint endpoint = context.getEndpoint("mock:results",
MockEndpoint.class);
+ endpoint.expectedMessageCount(3);
+
template.sendBodyAndHeader("direct:put", "TEST1", KratiConstants.KEY,
"1");
template.sendBodyAndHeader("direct:put", "TEST2", KratiConstants.KEY,
"2");
template.sendBodyAndHeader("direct:put", "TEST3", KratiConstants.KEY,
"3");
- MockEndpoint endpoint = context.getEndpoint("mock:results",
MockEndpoint.class);
- endpoint.expectedMessageCount(3);
endpoint.assertIsSatisfied();
Object result = template.requestBodyAndHeader("direct:get", null,
KratiConstants.KEY, "3");
@@ -54,23 +52,23 @@ public class KratiProducerSpringTest extends
CamelSpringTestSupport {
}
@Test
- public void testPutDeleteAndGet() throws InterruptedException {
- ProducerTemplate template = context.createProducerTemplate();
+ public void testPutDeleteAndGet() throws Exception {
template.sendBodyAndHeader("direct:put", "TEST1", KratiConstants.KEY,
"1");
template.sendBodyAndHeader("direct:put", "TEST2", KratiConstants.KEY,
"2");
template.sendBodyAndHeader("direct:put", "TEST3", KratiConstants.KEY,
"3");
template.requestBodyAndHeader("direct:delete", null,
KratiConstants.KEY, "3");
+
Object result = template.requestBodyAndHeader("direct:get", null,
KratiConstants.KEY, "3");
assertEquals(null, result);
}
@Test
- public void testPutDeleteAllAndGet() throws InterruptedException {
- ProducerTemplate template = context.createProducerTemplate();
+ public void testPutDeleteAllAndGet() throws Exception {
template.sendBodyAndHeader("direct:put", "TEST1", KratiConstants.KEY,
"1");
template.sendBodyAndHeader("direct:put", "TEST2", KratiConstants.KEY,
"2");
template.sendBodyAndHeader("direct:put", "TEST3", KratiConstants.KEY,
"3");
template.requestBodyAndHeader("direct:deleteall", null,
KratiConstants.KEY, "3");
+
Object result = template.requestBodyAndHeader("direct:get", null,
KratiConstants.KEY, "1");
assertEquals(null, result);
result = template.requestBodyAndHeader("direct:get", null,
KratiConstants.KEY, "2");
http://git-wip-us.apache.org/repos/asf/camel/blob/46a55b30/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerTest.java
----------------------------------------------------------------------
diff --git
a/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerTest.java
b/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerTest.java
index 2e499f0..ce6e05e 100644
---
a/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerTest.java
+++
b/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerTest.java
@@ -14,12 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.camel.component.krati;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit4.CamelTestSupport;
@@ -28,30 +26,29 @@ import org.junit.Test;
public class KratiProducerTest extends CamelTestSupport {
@Test
- public void testPut() throws InterruptedException {
- ProducerTemplate template = context.createProducerTemplate();
+ public void testPut() throws Exception {
+ MockEndpoint endpoint = context.getEndpoint("mock:results",
MockEndpoint.class);
+ endpoint.expectedMessageCount(3);
+
template.sendBodyAndHeader("direct:put", new ValueObject("TEST1"),
KratiConstants.KEY, new KeyObject("1"));
template.sendBodyAndHeader("direct:put", new ValueObject("TEST2"),
KratiConstants.KEY, new KeyObject("2"));
template.sendBodyAndHeader("direct:put", new ValueObject("TEST3"),
KratiConstants.KEY, new KeyObject("3"));
- MockEndpoint endpoint = context.getEndpoint("mock:results",
MockEndpoint.class);
- endpoint.expectedMessageCount(3);
+
endpoint.assertIsSatisfied();
}
-
@Test
- public void testPutAndGet() throws InterruptedException {
- ProducerTemplate template = context.createProducerTemplate();
+ public void testPutAndGet() throws Exception {
template.sendBodyAndHeader("direct:put", new ValueObject("TEST1"),
KratiConstants.KEY, new KeyObject("1"));
template.sendBodyAndHeader("direct:put", new ValueObject("TEST2"),
KratiConstants.KEY, new KeyObject("2"));
template.sendBodyAndHeader("direct:put", new ValueObject("TEST3"),
KratiConstants.KEY, new KeyObject("3"));
+
Object result = template.requestBodyAndHeader("direct:get", null,
KratiConstants.KEY, new KeyObject("3"));
assertEquals(new ValueObject("TEST3"), result);
}
@Test
- public void testPutAndGetPreserveHeaders() throws InterruptedException {
- ProducerTemplate template = context.createProducerTemplate();
+ public void testPutAndGetPreserveHeaders() throws Exception {
template.sendBodyAndHeader("direct:put", new ValueObject("TEST1"),
KratiConstants.KEY, new KeyObject("1"));
template.sendBodyAndHeader("direct:put", new ValueObject("TEST2"),
KratiConstants.KEY, new KeyObject("2"));
template.sendBodyAndHeader("direct:put", new ValueObject("TEST3"),
KratiConstants.KEY, new KeyObject("3"));
@@ -69,23 +66,23 @@ public class KratiProducerTest extends CamelTestSupport {
}
@Test
- public void testPutDeleteAndGet() throws InterruptedException {
- ProducerTemplate template = context.createProducerTemplate();
+ public void testPutDeleteAndGet() throws Exception {
template.sendBodyAndHeader("direct:put", "TEST1", KratiConstants.KEY,
"1");
template.sendBodyAndHeader("direct:put", "TEST2", KratiConstants.KEY,
"2");
template.sendBodyAndHeader("direct:put", "TEST3", KratiConstants.KEY,
"4");
template.requestBodyAndHeader("direct:delete", null,
KratiConstants.KEY, "4");
+
Object result = template.requestBodyAndHeader("direct:get", null,
KratiConstants.KEY, "4");
assertEquals(null, result);
}
@Test
- public void testPutDeleteAllAndGet() throws InterruptedException {
- ProducerTemplate template = context.createProducerTemplate();
+ public void testPutDeleteAllAndGet() throws Exception {
template.sendBodyAndHeader("direct:put", "TEST1", KratiConstants.KEY,
"1");
template.sendBodyAndHeader("direct:put", "TEST2", KratiConstants.KEY,
"2");
template.sendBodyAndHeader("direct:put", "TEST3", KratiConstants.KEY,
"3");
template.requestBodyAndHeader("direct:deleteall", null,
KratiConstants.KEY, "3");
+
Object result = template.requestBodyAndHeader("direct:get", null,
KratiConstants.KEY, "1");
assertEquals(null, result);
result = template.requestBodyAndHeader("direct:get", null,
KratiConstants.KEY, "2");