Author: davsclaus
Date: Tue Oct 30 16:09:35 2012
New Revision: 1403760
URL: http://svn.apache.org/viewvc?rev=1403760&view=rev
Log:
CAMEL-5579: Added AbstractListAggregationStrategy to make it easier to
aggregate a List<V> with the aggregator eip. Thanks to Claudio Corsi for
partial patch.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AbstractListAggregationStrategy.java
(with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomListAggregationStrategyTest.java
(with props)
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AbstractListAggregationStrategy.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AbstractListAggregationStrategy.java?rev=1403760&view=auto
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AbstractListAggregationStrategy.java
(added)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AbstractListAggregationStrategy.java
Tue Oct 30 16:09:35 2012
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchange;
+
+/**
+ * Aggregate all exchanges into a {@link List} of values defined by the {@link
#getValue(Exchange)} call.
+ * The combined Exchange will hold all the aggregated exchanges in a {@link
java.util.List}
+ * as a exchange property with the key {@link
org.apache.camel.Exchange#GROUPED_EXCHANGE}.
+ * <p/>
+ * The method {@link #isStoreAsBodyOnCompletion()} determines if the
aggregated {@link List} should
+ * be stored on the {@link org.apache.camel.Message#setBody(Object)} or be
kept as a property
+ * on the exchange.
+ * <br/>
+ * The default behavior to store as message body, allows to more easily group
together a list of values
+ * and have its result stored as a {@link List} on the completed {@link
Exchange}.
+ *
+ * @since 2.11
+ */
+public abstract class AbstractListAggregationStrategy<V> implements
CompletionAwareAggregationStrategy {
+
+ /**
+ * This method is implemented by the sub-class and is called to retrieve
+ * an instance of the value that will be aggregated and forwarded to the
+ * receiving end point.
+ * <p/>
+ * If <tt>null</tt> is returned, then the value is <b>not</b> added to the
{@link List}.
+ *
+ * @param exchange The exchange that is used to retrieve the value from
+ * @return An instance of V that is the associated value of the passed
exchange
+ */
+ public abstract V getValue(Exchange exchange);
+
+ /**
+ * Whether to store the completed aggregated {@link List} as message body,
or to keep as property on the exchange.
+ * <p/>
+ * The default behavior is <tt>true</tt> to store as message body.
+ *
+ * @return <tt>true</tt> to store as message body, <tt>false</tt> to keep
as property on the exchange.
+ */
+ public boolean isStoreAsBodyOnCompletion() {
+ return true;
+ }
+
+ @SuppressWarnings("unchecked")
+ public void onCompletion(Exchange exchange) {
+ if (isStoreAsBodyOnCompletion()) {
+ List<V> list = (List<V>)
exchange.removeProperty(Exchange.GROUPED_EXCHANGE);
+ if (list != null) {
+ exchange.getIn().setBody(list);
+ }
+ }
+ }
+
+ /**
+ * This method will aggregate the old and new exchange and return the
result.
+ *
+ * @param oldExchange The oldest exchange, can be null
+ * @param newExchange The newest exchange, can be null
+ * @return a composite exchange of the old and/or new exchanges
+ */
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ List<V> list;
+ Exchange answer = oldExchange;
+
+ if (oldExchange == null) {
+ answer = new DefaultExchange(newExchange);
+ list = getList(answer);
+ } else {
+ list = getList(oldExchange);
+ }
+
+ if (newExchange != null) {
+ V value = getValue(newExchange);
+ if (value != null) {
+ list.add(value);
+ }
+ }
+
+ return answer;
+ }
+
+ @SuppressWarnings("unchecked")
+ private List<V> getList(Exchange exchange) {
+ List<V> list = exchange.getProperty(Exchange.GROUPED_EXCHANGE,
List.class);
+ if (list == null) {
+ list = new ArrayList<V>();
+ exchange.setProperty(Exchange.GROUPED_EXCHANGE, list);
+ }
+ return list;
+ }
+
+}
Propchange:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AbstractListAggregationStrategy.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AbstractListAggregationStrategy.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java?rev=1403760&r1=1403759&r2=1403760&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
Tue Oct 30 16:09:35 2012
@@ -16,11 +16,7 @@
*/
package org.apache.camel.processor.aggregate;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.camel.Exchange;
-import org.apache.camel.impl.DefaultExchange;
/**
* Aggregate all exchanges into a single combined Exchange holding all the
aggregated exchanges
@@ -29,25 +25,17 @@ import org.apache.camel.impl.DefaultExch
*
* @version
*/
-public class GroupedExchangeAggregationStrategy implements AggregationStrategy
{
+public class GroupedExchangeAggregationStrategy extends
AbstractListAggregationStrategy<Exchange> {
+
+ @Override
+ public boolean isStoreAsBodyOnCompletion() {
+ // keep the list as a property to be compatible with old behavior
+ return false;
+ }
- @SuppressWarnings("unchecked")
- public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
- List<Exchange> list;
- Exchange answer = oldExchange;
-
- if (oldExchange == null) {
- answer = new DefaultExchange(newExchange);
- list = new ArrayList<Exchange>();
- answer.setProperty(Exchange.GROUPED_EXCHANGE, list);
- } else {
- list = oldExchange.getProperty(Exchange.GROUPED_EXCHANGE,
List.class);
- }
-
- if (newExchange != null) {
- list.add(newExchange);
- }
- return answer;
+ @Override
+ public Exchange getValue(Exchange exchange) {
+ return exchange;
}
}
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomListAggregationStrategyTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomListAggregationStrategyTest.java?rev=1403760&view=auto
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomListAggregationStrategyTest.java
(added)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomListAggregationStrategyTest.java
Tue Oct 30 16:09:35 2012
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AbstractListAggregationStrategy;
+
+/**
+ *
+ */
+public class CustomListAggregationStrategyTest extends ContextTestSupport {
+
+ @SuppressWarnings("unchecked")
+ public void testCustomAggregationStrategy() throws Exception {
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedMessageCount(1);
+
+ template.sendBodyAndHeader("direct:start", "100", "id", "1");
+ template.sendBodyAndHeader("direct:start", "150", "id", "1");
+ template.sendBodyAndHeader("direct:start", "130", "id", "1");
+
+ assertMockEndpointsSatisfied();
+
+ // the list will be stored as the message body by default
+ List<Integer> numbers =
result.getExchanges().get(0).getIn().getBody(List.class);
+ assertNotNull(numbers);
+ assertEquals(Integer.valueOf("100"), numbers.get(0));
+ assertEquals(Integer.valueOf("150"), numbers.get(1));
+ assertEquals(Integer.valueOf("130"), numbers.get(2));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .aggregate(new MyListOfNumbersStrategy()).header("id")
+ .completionSize(3)
+ .to("mock:result");
+ }
+ };
+ }
+
+ // START SNIPPET: e1
+ /**
+ * Our strategy just group a list of integers.
+ */
+ public final class MyListOfNumbersStrategy extends
AbstractListAggregationStrategy<Integer> {
+
+ @Override
+ public Integer getValue(Exchange exchange) {
+ // the message body contains a number, so just return that as-is
+ return exchange.getIn().getBody(Integer.class);
+ }
+ }
+ // END SNIPPET: e1
+
+}
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomListAggregationStrategyTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomListAggregationStrategyTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date