Updated Branches: refs/heads/camel-2.11.x 75a171143 -> 2d5929fd3
Seda consumer should validate that thet are all have same multiple consumers option as they cannot have different values. This is per queue. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2d5929fd Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2d5929fd Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2d5929fd Branch: refs/heads/camel-2.11.x Commit: 2d5929fd3444c8a882ceedd72819915fc39ee199 Parents: 75a1711 Author: Claus Ibsen <[email protected]> Authored: Wed Jun 12 16:00:29 2013 -0400 Committer: Claus Ibsen <[email protected]> Committed: Thu Jun 13 14:07:09 2013 +0200 ---------------------------------------------------------------------- .../camel/component/seda/SedaComponent.java | 22 +++++- .../camel/component/seda/SedaEndpoint.java | 13 +++- ...edaQueueMultipleConsumersDifferenceTest.java | 70 ++++++++++++++++++++ 3 files changed, 102 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/2d5929fd/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java index 8cb2b2b..7528fa8 100644 --- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java +++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java @@ -56,7 +56,15 @@ public class SedaComponent extends DefaultComponent { return defaultConcurrentConsumers; } + /** + * @deprecated use {@link #getOrCreateQueue(String, Integer, Boolean)} + */ + @Deprecated public synchronized QueueReference getOrCreateQueue(String uri, Integer size) { + return getOrCreateQueue(uri, size, null); + } + + public synchronized QueueReference getOrCreateQueue(String uri, Integer size, Boolean multipleConsumers) { String key = getQueueKey(uri); QueueReference ref = getQueues().get(key); @@ -92,7 +100,7 @@ public class SedaComponent extends DefaultComponent { log.debug("Created queue {} with size {}", key, size); // create and add a new reference queue - ref = new QueueReference(queue, size); + ref = new QueueReference(queue, size, multipleConsumers); ref.addReference(); getQueues().put(key, ref); @@ -103,6 +111,10 @@ public class SedaComponent extends DefaultComponent { return queues; } + public QueueReference getQueueReference(String key) { + return queues.get(key); + } + @Override protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { int consumers = getAndRemoveParameter(parameters, "concurrentConsumers", Integer.class, defaultConcurrentConsumers); @@ -160,10 +172,12 @@ public class SedaComponent extends DefaultComponent { private final BlockingQueue<Exchange> queue; private volatile int count; private Integer size; + private Boolean multipleConsumers; - private QueueReference(BlockingQueue<Exchange> queue, Integer size) { + private QueueReference(BlockingQueue<Exchange> queue, Integer size, Boolean multipleConsumers) { this.queue = queue; this.size = size; + this.multipleConsumers = multipleConsumers; } void addReference() { @@ -190,6 +204,10 @@ public class SedaComponent extends DefaultComponent { return size; } + public Boolean getMultipleConsumers() { + return multipleConsumers; + } + /** * Gets the queue */ http://git-wip-us.apache.org/repos/asf/camel/blob/2d5929fd/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java index 3e1fa2b..7faa956 100644 --- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java @@ -95,6 +95,17 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, } public Consumer createConsumer(Processor processor) throws Exception { + if (getComponent() != null) { + // all consumers must match having the same multipleConsumers options + String key = getComponent().getQueueKey(getEndpointUri()); + SedaComponent.QueueReference ref = getComponent().getQueueReference(key); + if (ref != null && ref.getMultipleConsumers() != isMultipleConsumers()) { + // there is already a multiple consumers, so make sure they matches + throw new IllegalArgumentException("Cannot use existing queue " + key + " as the existing queue multiple consumers " + + ref.getMultipleConsumers() + " does not match given multiple consumers " + multipleConsumers); + } + } + Consumer answer = new SedaConsumer(this, processor); configureConsumer(answer); return answer; @@ -108,7 +119,7 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, if (getComponent() != null) { // use null to indicate default size (= use what the existing queue has been configured with) Integer size = getSize() == Integer.MAX_VALUE ? null : getSize(); - SedaComponent.QueueReference ref = getComponent().getOrCreateQueue(getEndpointUri(), size); + SedaComponent.QueueReference ref = getComponent().getOrCreateQueue(getEndpointUri(), size, isMultipleConsumers()); queue = ref.getQueue(); String key = getComponent().getQueueKey(getEndpointUri()); LOG.info("Endpoint {} is using shared queue: {} with size: {}", new Object[]{this, key, ref.getSize() != null ? ref.getSize() : Integer.MAX_VALUE}); http://git-wip-us.apache.org/repos/asf/camel/blob/2d5929fd/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueMultipleConsumersDifferenceTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueMultipleConsumersDifferenceTest.java b/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueMultipleConsumersDifferenceTest.java new file mode 100644 index 0000000..98856df --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueMultipleConsumersDifferenceTest.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.seda; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.ResolveEndpointFailedException; +import org.apache.camel.builder.RouteBuilder; + +/** + * + */ +public class SameSedaQueueMultipleConsumersDifferenceTest extends ContextTestSupport { + + public void testSameOptions() throws Exception { + getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:bar").expectedBodiesReceived("Hello World"); + + template.sendBody("seda:foo?multipleConsumers=true", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + public void testSameOptionsProducerStillOkay() throws Exception { + getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:bar").expectedBodiesReceived("Hello World"); + + template.sendBody("seda:foo", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + public void testAddConsumer() throws Exception { + try { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("seda:foo").routeId("fail").to("mock:fail"); + } + }); + fail("Should have thrown exception"); + } catch (IllegalArgumentException e) { + assertEquals("Cannot use existing queue seda://foo as the existing queue multiple consumers true does not match given multiple consumers false", e.getMessage()); + } + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("seda:foo?multipleConsumers=true").routeId("foo").to("mock:foo"); + from("seda:foo?multipleConsumers=true").routeId("bar").to("mock:bar"); + } + }; + } +}
