Author: boday
Date: Fri Jul 6 22:13:51 2012
New Revision: 1358452
URL: http://svn.apache.org/viewvc?rev=1358452&view=rev
Log:
CAMEL-3211 added basic "pollMultiple" support to the pollEnrich EIP
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherTest.java
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java?rev=1358452&r1=1358451&r2=1358452&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
Fri Jul 6 22:13:51 2012
@@ -48,14 +48,17 @@ public class PollEnrichDefinition extend
private String aggregationStrategyRef;
@XmlTransient
private AggregationStrategy aggregationStrategy;
+ @XmlAttribute
+ private Boolean pollMultiple;
public PollEnrichDefinition() {
}
- public PollEnrichDefinition(AggregationStrategy aggregationStrategy,
String resourceUri, long timeout) {
+ public PollEnrichDefinition(AggregationStrategy aggregationStrategy,
String resourceUri, long timeout, Boolean pollMultiple) {
this.aggregationStrategy = aggregationStrategy;
this.resourceUri = resourceUri;
this.timeout = timeout;
+ this.pollMultiple = pollMultiple;
}
@Override
@@ -93,10 +96,10 @@ public class PollEnrichDefinition extend
PollEnricher enricher;
if (timeout != null) {
- enricher = new PollEnricher(null,
endpoint.createPollingConsumer(), timeout);
+ enricher = new PollEnricher(null,
endpoint.createPollingConsumer(), timeout, pollMultiple);
} else {
// if no timeout then we should block, and there use a negative
timeout
- enricher = new PollEnricher(null,
endpoint.createPollingConsumer(), -1);
+ enricher = new PollEnricher(null,
endpoint.createPollingConsumer(), -1, pollMultiple);
}
if (aggregationStrategyRef != null) {
@@ -150,4 +153,12 @@ public class PollEnrichDefinition extend
public void setAggregationStrategy(AggregationStrategy
aggregationStrategy) {
this.aggregationStrategy = aggregationStrategy;
}
+
+ public Boolean isPollMultiple() {
+ return pollMultiple;
+ }
+
+ public void setPollMultiple(Boolean pollMultiple) {
+ this.pollMultiple = pollMultiple;
+ }
}
\ No newline at end of file
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=1358452&r1=1358451&r2=1358452&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
Fri Jul 6 22:13:51 2012
@@ -2823,7 +2823,52 @@ public abstract class ProcessorDefinitio
*/
@SuppressWarnings("unchecked")
public Type pollEnrich(String resourceUri) {
- addOutput(new PollEnrichDefinition(null, resourceUri, -1));
+ addOutput(new PollEnrichDefinition(null, resourceUri, -1, false));
+ return (Type) this;
+ }
+
+ /**
+ * The <a href="http://camel.apache.org/content-enricher.html">Content
Enricher EIP</a>
+ * enriches an exchange with additional data obtained from a
<code>resourceUri</code>
+ * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint.
+ * <p/>
+ * The difference between this and {@link #enrich(String)} is that this
uses a consumer
+ * to obtain the additional data, where as enrich uses a producer.
+ * <p/>
+ * This method will not wait for data to become available, use the method
with an explicit timeout
+ * if you want to wait for data for a period of time from the resourceUri.
+ *
+ * @param resourceUri URI of resource endpoint for obtaining
additional data.
+ * @param pollMultiple if enabled will poll for all Exchanges
available on the endpoint
+ * @return the builder
+ * @see org.apache.camel.processor.PollEnricher
+ */
+ @SuppressWarnings("unchecked")
+ public Type pollEnrich(String resourceUri, boolean pollMultiple) {
+ addOutput(new PollEnrichDefinition(null, resourceUri, 0,
pollMultiple));
+ return (Type) this;
+ }
+
+ /**
+ * The <a href="http://camel.apache.org/content-enricher.html">Content
Enricher EIP</a>
+ * enriches an exchange with additional data obtained from a
<code>resourceUri</code>
+ * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint.
+ * <p/>
+ * The difference between this and {@link #enrich(String)} is that this
uses a consumer
+ * to obtain the additional data, where as enrich uses a producer.
+ * <p/>
+ * This method will <tt>block</tt> until data is available, use the method
with timeout if you do not
+ * want to risk waiting a long time before data is available from the
resourceUri.
+ *
+ * @param resourceUri URI of resource endpoint for obtaining
additional data.
+ * @param timeout timeout in millis to wait at most for data
to be available.
+ * @param pollMultiple if enabled will poll for all Exchanges
available on the endpoint
+ * @return the builder
+ * @see org.apache.camel.processor.PollEnricher
+ */
+ @SuppressWarnings("unchecked")
+ public Type pollEnrich(String resourceUri, long timeout, boolean
pollMultiple) {
+ addOutput(new PollEnrichDefinition(null, resourceUri, timeout,
pollMultiple));
return (Type) this;
}
@@ -2845,7 +2890,7 @@ public abstract class ProcessorDefinitio
*/
@SuppressWarnings("unchecked")
public Type pollEnrich(String resourceUri, AggregationStrategy
aggregationStrategy) {
- addOutput(new PollEnrichDefinition(aggregationStrategy, resourceUri,
-1));
+ addOutput(new PollEnrichDefinition(aggregationStrategy, resourceUri,
-1, false));
return (Type) this;
}
@@ -2869,7 +2914,7 @@ public abstract class ProcessorDefinitio
*/
@SuppressWarnings("unchecked")
public Type pollEnrich(String resourceUri, long timeout,
AggregationStrategy aggregationStrategy) {
- addOutput(new PollEnrichDefinition(aggregationStrategy, resourceUri,
timeout));
+ addOutput(new PollEnrichDefinition(aggregationStrategy, resourceUri,
timeout, false));
return (Type) this;
}
@@ -2892,7 +2937,7 @@ public abstract class ProcessorDefinitio
*/
@SuppressWarnings("unchecked")
public Type pollEnrich(String resourceUri, long timeout) {
- addOutput(new PollEnrichDefinition(null, resourceUri, timeout));
+ addOutput(new PollEnrichDefinition(null, resourceUri, timeout, false));
return (Type) this;
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java?rev=1358452&r1=1358451&r2=1358452&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
Fri Jul 6 22:13:51 2012
@@ -16,6 +16,9 @@
*/
package org.apache.camel.processor;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.PollingConsumer;
@@ -47,6 +50,7 @@ public class PollEnricher extends Servic
private AggregationStrategy aggregationStrategy;
private PollingConsumer consumer;
private long timeout;
+ private Boolean pollMultiple;
/**
* Creates a new {@link PollEnricher}. The default aggregation strategy is
to
@@ -57,7 +61,7 @@ public class PollEnricher extends Servic
* @param consumer consumer to resource endpoint.
*/
public PollEnricher(PollingConsumer consumer) {
- this(defaultAggregationStrategy(), consumer, 0);
+ this(defaultAggregationStrategy(), consumer, 0, false);
}
/**
@@ -74,6 +78,21 @@ public class PollEnricher extends Servic
}
/**
+ * Creates a new {@link PollEnricher}.
+ *
+ * @param aggregationStrategy aggregation strategy to aggregate input
data and additional data.
+ * @param consumer consumer to resource endpoint.
+ * @param timeout timeout in millis
+ * @param pollMultiple enabled building a List of multiple exchanges
+ */
+ public PollEnricher(AggregationStrategy aggregationStrategy,
PollingConsumer consumer, long timeout, Boolean pollMultiple) {
+ this.aggregationStrategy = aggregationStrategy;
+ this.consumer = consumer;
+ this.timeout = timeout;
+ this.pollMultiple = pollMultiple;
+ }
+
+ /**
* Sets the aggregation strategy for this poll enricher.
*
* @param aggregationStrategy the aggregationStrategy to set
@@ -100,6 +119,10 @@ public class PollEnricher extends Servic
this.timeout = timeout;
}
+ public void setPollMultiple(Boolean value) {
+ this.pollMultiple = value;
+ }
+
/**
* Enriches the input data (<code>exchange</code>) by first obtaining
* additional data from an endpoint represented by an endpoint
@@ -113,47 +136,70 @@ public class PollEnricher extends Servic
* @param exchange input data.
*/
public void process(Exchange exchange) throws Exception {
+
preCheckPoll(exchange);
- Exchange resourceExchange;
- if (timeout < 0) {
- LOG.debug("Consumer receive: {}", consumer);
- resourceExchange = consumer.receive();
- } else if (timeout == 0) {
- LOG.debug("Consumer receiveNoWait: {}", consumer);
- resourceExchange = consumer.receiveNoWait();
- } else {
- LOG.debug("Consumer receive with timeout: {} ms. {}", timeout,
consumer);
- resourceExchange = consumer.receive(timeout);
- }
+ if (pollMultiple != null && pollMultiple) {
- if (resourceExchange == null) {
- LOG.debug("Consumer received no exchange");
- } else {
- LOG.debug("Consumer received: {}", resourceExchange);
- }
+ List<Exchange> exchangeList = new ArrayList<Exchange>();
+ Exchange receivedExchange;
+ while (true) {
+ if (timeout == 0) {
+ LOG.debug("Polling Consumer receiveNoWait: {}", consumer);
+ receivedExchange = consumer.receiveNoWait();
+ } else {
+ LOG.debug("Polling Consumer receive with timeout: {} ms.
{}", timeout, consumer);
+ receivedExchange = consumer.receive(timeout);
+ }
- if (resourceExchange != null && resourceExchange.isFailed()) {
- // copy resource exchange onto original exchange (preserving
pattern)
- copyResultsPreservePattern(exchange, resourceExchange);
+ if (receivedExchange == null) {
+ break;
+ }
+ exchangeList.add(receivedExchange);
+ }
+ exchange.getIn().setBody(exchangeList);
} else {
- prepareResult(exchange);
-
- // prepare the exchanges for aggregation
- ExchangeHelper.prepareAggregation(exchange, resourceExchange);
- // must catch any exception from aggregation
- Exchange aggregatedExchange;
- try {
- aggregatedExchange = aggregationStrategy.aggregate(exchange,
resourceExchange);
- } catch (Throwable e) {
- throw new CamelExchangeException("Error occurred during
aggregation", exchange, e);
+
+ Exchange resourceExchange;
+ if (timeout < 0) {
+ LOG.debug("Consumer receive: {}", consumer);
+ resourceExchange = consumer.receive();
+ } else if (timeout == 0) {
+ LOG.debug("Consumer receiveNoWait: {}", consumer);
+ resourceExchange = consumer.receiveNoWait();
+ } else {
+ LOG.debug("Consumer receive with timeout: {} ms. {}", timeout,
consumer);
+ resourceExchange = consumer.receive(timeout);
+ }
+
+ if (resourceExchange == null) {
+ LOG.debug("Consumer received no exchange");
+ } else {
+ LOG.debug("Consumer received: {}", resourceExchange);
}
- if (aggregatedExchange != null) {
- // copy aggregation result onto original exchange (preserving
pattern)
- copyResultsPreservePattern(exchange, aggregatedExchange);
- // handover any synchronization
- if (resourceExchange != null) {
- resourceExchange.handoverCompletions(exchange);
+
+ if (resourceExchange != null && resourceExchange.isFailed()) {
+ // copy resource exchange onto original exchange (preserving
pattern)
+ copyResultsPreservePattern(exchange, resourceExchange);
+ } else {
+ prepareResult(exchange);
+
+ // prepare the exchanges for aggregation
+ ExchangeHelper.prepareAggregation(exchange, resourceExchange);
+ // must catch any exception from aggregation
+ Exchange aggregatedExchange;
+ try {
+ aggregatedExchange =
aggregationStrategy.aggregate(exchange, resourceExchange);
+ } catch (Throwable e) {
+ throw new CamelExchangeException("Error occurred during
aggregation", exchange, e);
+ }
+ if (aggregatedExchange != null) {
+ // copy aggregation result onto original exchange
(preserving pattern)
+ copyResultsPreservePattern(exchange, aggregatedExchange);
+ // handover any synchronization
+ if (resourceExchange != null) {
+ resourceExchange.handoverCompletions(exchange);
+ }
}
}
}
Modified:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherTest.java?rev=1358452&r1=1358451&r2=1358452&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherTest.java
Fri Jul 6 22:13:51 2012
@@ -16,6 +16,9 @@
*/
package org.apache.camel.processor.enricher;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
@@ -116,6 +119,41 @@ public class PollEnricherTest extends Co
assertNull(exchange.getException());
}
+ public void testPollEnrichMultipleDefaultNoWait() throws
InterruptedException {
+
+ mock.expectedMessageCount(1);
+ template.sendBody("seda:foo5", "msg1");
+ template.sendBody("seda:foo5", "msg2");
+ template.sendBody("seda:enricher-test-5", "test");
+ Thread.sleep(100);
+ template.sendBody("seda:foo5", "msg3");
+ template.sendBody("seda:foo5", "msg4");
+
+ List<Exchange> polledExchanges =
mock.getExchanges().get(0).getIn().getBody(List.class);
+ assertEquals(2, polledExchanges.size());
+
+ mock.expectedHeaderReceived(Exchange.TO_ENDPOINT, "seda://foo5");
+ mock.assertIsSatisfied(0);
+ }
+
+ public void testPollEnrichMultipleExplicitTimeout() throws
InterruptedException {
+
+ mock.expectedMessageCount(1);
+ template.sendBody("seda:foo6", "msg1");
+ template.sendBody("seda:foo6", "msg2");
+ template.sendBody("seda:enricher-test-6", "test");
+ template.sendBody("seda:foo6", "msg3");
+ template.sendBody("seda:foo6", "msg4");
+
+ Thread.sleep(500);
+
+ List<Exchange> polledExchanges =
mock.getExchanges().get(0).getIn().getBody(List.class);
+ assertEquals(4, polledExchanges.size());
+
+ mock.expectedHeaderReceived(Exchange.TO_ENDPOINT, "seda://foo6");
+ mock.assertIsSatisfied(0);
+ }
+
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
@@ -141,6 +179,18 @@ public class PollEnricherTest extends Co
from("direct:enricher-test-4")
.pollEnrich("seda:foo4", aggregationStrategy);
+
+ //
-------------------------------------------------------------
+ // Poll Multiple routes
+ //
-------------------------------------------------------------
+
+ from("seda:enricher-test-5")
+ .pollEnrich("seda:foo5", true)
+ .to("mock:mock");
+
+ from("seda:enricher-test-6")
+ .pollEnrich("seda:foo6", 200, true)
+ .to("mock:mock");
}
};
}
Modified:
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml?rev=1358452&r1=1358451&r2=1358452&view=diff
==============================================================================
---
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml
(original)
+++
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml
Fri Jul 6 22:13:51 2012
@@ -48,6 +48,19 @@
<pollEnrich uri="seda:foo4" strategyRef="sampleAggregator"/>
<to uri="mock:mock"/>
</route>
+
+ <route>
+ <from uri="seda:enricher-test-5"/>
+ <pollEnrich uri="seda:foo5" pollMultiple="true"/>
+ <to uri="mock:mock"/>
+ </route>
+
+ <route>
+ <from uri="seda:enricher-test-6"/>
+ <pollEnrich uri="seda:foo6" timeout="200" pollMultiple="true"/>
+ <to uri="mock:mock"/>
+ </route>
+
</camelContext>
<bean id="sampleAggregator"
class="org.apache.camel.processor.enricher.SampleAggregator"/>
Modified:
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml?rev=1358452&r1=1358451&r2=1358452&view=diff
==============================================================================
---
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml
(original)
+++
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml
Fri Jul 6 22:13:51 2012
@@ -28,6 +28,8 @@
<endpoint id="foo2" uri="seda:foo2"/>
<endpoint id="foo3" uri="seda:foo3"/>
<endpoint id="foo4" uri="seda:foo4"/>
+ <endpoint id="foo5" uri="seda:foo5"/>
+ <endpoint id="foo6" uri="seda:foo6"/>
<!-- START SNIPPET: e1 -->
<route>
@@ -54,6 +56,19 @@
<pollEnrich ref="foo4" strategyRef="sampleAggregator"/>
<to uri="mock:mock"/>
</route>
+
+ <route>
+ <from uri="seda:enricher-test-5"/>
+ <pollEnrich ref="foo5" pollMultiple="true"/>
+ <to uri="mock:mock"/>
+ </route>
+
+ <route>
+ <from uri="seda:enricher-test-6"/>
+ <pollEnrich ref="foo6" timeout="200" pollMultiple="true"/>
+ <to uri="mock:mock"/>
+ </route>
+
</camelContext>
<bean id="sampleAggregator"
class="org.apache.camel.processor.enricher.SampleAggregator"/>