CAMEL-6650: AggregationStrategy - Allow to use a pojo with no Camel API 
dependencies. Work in progress.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5aa3ba6c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5aa3ba6c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5aa3ba6c

Branch: refs/heads/master
Commit: 5aa3ba6cce76755c0b4b0b0bd9ef7c34aa1de694
Parents: ee1fd85
Author: Claus Ibsen <davscl...@apache.org>
Authored: Mon Aug 19 09:08:22 2013 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Tue Aug 20 08:29:57 2013 +0200

----------------------------------------------------------------------
 .../AggregationStrategyBeanAdapter.java         | 241 +++++++++++++++++++
 .../aggregate/AggregationStrategyBeanInfo.java  | 118 +++++++++
 .../AggregationStrategyMethodInfo.java          |  72 ++++++
 ...tegyBeanAdapterAllowNullOldExchangeTest.java |  69 ++++++
 ...nStrategyBeanAdapterNonStaticMethodTest.java |  64 +++++
 ...egationStrategyBeanAdapterOneMethodTest.java |  65 +++++
 ...apterPollEnrichAllowNullNewExchangeTest.java |  63 +++++
 ...gationStrategyBeanAdapterPollEnrichTest.java |  68 ++++++
 ...tionStrategyBeanAdapterStaticMethodTest.java |  64 +++++
 .../AggregationStrategyBeanAdapterTest.java     |  65 +++++
 ...BeanAdapterWithHeadersAndPropertiesTest.java |  74 ++++++
 ...ationStrategyBeanAdapterWithHeadersTest.java |  69 ++++++
 12 files changed, 1032 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/5aa3ba6c/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategyBeanAdapter.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategyBeanAdapter.java
 
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategyBeanAdapter.java
new file mode 100644
index 0000000..21d8199
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategyBeanAdapter.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate;
+
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.Proxy;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.ServiceHelper;
+
+/**
+ * An {@link AggregationStrategy} that adapts to a POJO.
+ * <p/>
+ * This allows end users to use POJOs for the aggregation logic, instead of 
having to implement the
+ * Camel API {@link AggregationStrategy}.
+ */
+public final class AggregationStrategyBeanAdapter extends ServiceSupport 
implements AggregationStrategy, CamelContextAware {
+
+    // TODO: Add parameter bindings for:
+    // - @Header / @Property / @XPath etc
+    // - CamelContext, Registry, etc.
+    // TODO: Add DSL support in Java DSL
+    // TODO: Add DSL support in XML DSL
+
+    private static final List<Method> EXCLUDED_METHODS = new 
ArrayList<Method>();
+    private CamelContext camelContext;
+    private Object pojo;
+    private final Class<?> type;
+    private String methodName;
+    private boolean allowNullOldExchange = false;
+    private boolean allowNullNewExchange = false;
+    private volatile AggregationStrategyMethodInfo mi;
+
+    static {
+        // exclude all java.lang.Object methods as we dont want to invoke them
+        EXCLUDED_METHODS.addAll(Arrays.asList(Object.class.getMethods()));
+        // exclude all java.lang.reflect.Proxy methods as we dont want to 
invoke them
+        EXCLUDED_METHODS.addAll(Arrays.asList(Proxy.class.getMethods()));
+    }
+
+    /**
+     * Creates this adapter.
+     *
+     * @param pojo the pojo to use.
+     */
+    public AggregationStrategyBeanAdapter(Object pojo) {
+        this(pojo, null);
+    }
+
+    /**
+     * Creates this adapter.
+     *
+     * @param type the class type of the pojo
+     */
+    public AggregationStrategyBeanAdapter(Class<?> type) {
+        this(type, null);
+    }
+
+    /**
+     * Creates this adapter.
+     *
+     * @param pojo the pojo to use.
+     * @param methodName the name of the method to call
+     */
+    public AggregationStrategyBeanAdapter(Object pojo, String methodName) {
+        this.pojo = pojo;
+        this.type = pojo.getClass();
+        this.methodName = methodName;
+    }
+
+    /**
+     * Creates this adapter.
+     *
+     * @param type the class type of the pojo
+     * @param methodName the name of the method to call
+     */
+    public AggregationStrategyBeanAdapter(Class<?> type, String methodName) {
+        this.type = type;
+        this.pojo = null;
+        this.methodName = methodName;
+    }
+
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    public String getMethodName() {
+        return methodName;
+    }
+
+    public void setMethodName(String methodName) {
+        this.methodName = methodName;
+    }
+
+    public boolean isAllowNullOldExchange() {
+        return allowNullOldExchange;
+    }
+
+    public void setAllowNullOldExchange(boolean allowNullOldExchange) {
+        this.allowNullOldExchange = allowNullOldExchange;
+    }
+
+    public boolean isAllowNullNewExchange() {
+        return allowNullNewExchange;
+    }
+
+    public void setAllowNullNewExchange(boolean allowNullNewExchange) {
+        this.allowNullNewExchange = allowNullNewExchange;
+    }
+
+    @Override
+    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        if (!allowNullOldExchange && oldExchange == null) {
+            return newExchange;
+        }
+        if (!allowNullNewExchange && newExchange == null) {
+            return oldExchange;
+        }
+
+        try {
+            Object out = mi.invoke(pojo, oldExchange, newExchange);
+            if (out != null) {
+                if (oldExchange != null) {
+                    oldExchange.getIn().setBody(out);
+                } else {
+                    newExchange.getIn().setBody(out);
+                }
+            }
+        } catch (Exception e) {
+            if (oldExchange != null) {
+                oldExchange.setException(e);
+            } else {
+                newExchange.setException(e);
+            }
+        }
+        return oldExchange != null ? oldExchange : newExchange;
+    }
+
+    /**
+     * Validates whether the given method is valid.
+     *
+     * @param method  the method
+     * @return true if valid, false to skip the method
+     */
+    protected boolean isValidMethod(Method method) {
+        // must not be in the excluded list
+        for (Method excluded : EXCLUDED_METHODS) {
+            if (method.equals(excluded)) {
+                return false;
+            }
+        }
+
+        // must be a public method
+        if (!Modifier.isPublic(method.getModifiers())) {
+            return false;
+        }
+
+        // return type must not be void and it should not be a bridge method
+        if (method.getReturnType().equals(Void.TYPE) || method.isBridge()) {
+            return false;
+        }
+
+        return true;
+    }
+
+    private static boolean isStaticMethod(Method method) {
+        return Modifier.isStatic(method.getModifiers());
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        Method found = null;
+        if (methodName != null) {
+            for (Method method : type.getMethods()) {
+                if (isValidMethod(method) && 
method.getName().equals(methodName)) {
+                    if (found == null) {
+                        found = method;
+                    } else {
+                        throw new IllegalArgumentException("The bean " + type 
+ " has 2 or more methods with the name " + methodName);
+                    }
+                }
+            }
+        } else {
+            for (Method method : type.getMethods()) {
+                if (isValidMethod(method)) {
+                    if (found == null) {
+                        found = method;
+                    } else {
+                        throw new IllegalArgumentException("The bean " + type 
+ " has 2 or more methods and no explicit method name was configured.");
+                    }
+                }
+            }
+        }
+
+        if (found == null) {
+            throw new UnsupportedOperationException("Cannot find a valid 
method with name: " + methodName + " on bean type: " + type);
+        }
+
+        // if its not a static method then we must have an instance of the pojo
+        if (!isStaticMethod(found) && pojo == null) {
+            pojo = camelContext.getInjector().newInstance(type);
+        }
+
+        // create the method info which has adapted to the pojo
+        AggregationStrategyBeanInfo bi = new 
AggregationStrategyBeanInfo(getCamelContext(), type, found);
+        mi = bi.createMethodInfo();
+
+        // in case the pojo is a service
+        ServiceHelper.startService(pojo);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        ServiceHelper.stopService(pojo);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/5aa3ba6c/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategyBeanInfo.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategyBeanInfo.java
 
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategyBeanInfo.java
new file mode 100644
index 0000000..f898be7
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategyBeanInfo.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Expression;
+import org.apache.camel.builder.ExpressionBuilder;
+import org.apache.camel.component.bean.ParameterInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class information about the POJO method to call when using the {@link 
AggregationStrategyBeanAdapter}.
+ */
+public class AggregationStrategyBeanInfo {
+
+    // TODO: We could potential merge this logic into 
AggregationStrategyMethodInfo and only have 1 class
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AggregationStrategyBeanInfo.class);
+
+    private final CamelContext camelContext;
+    private final Class<?> type;
+    private final Method method;
+
+    public AggregationStrategyBeanInfo(CamelContext camelContext, Class<?> 
type, Method method) {
+        this.camelContext = camelContext;
+        this.type = type;
+        this.method = method;
+    }
+
+    protected AggregationStrategyMethodInfo createMethodInfo() {
+        Class<?>[] parameterTypes = method.getParameterTypes();
+
+        int size = parameterTypes.length;
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Creating MethodInfo for class: {} method: {} having {} 
parameters", new Object[]{type, method, size});
+        }
+
+        // must have equal number of parameters
+        if (size < 2) {
+            throw new IllegalArgumentException("The method " + 
method.getName() + " must have at least two parameters, has: " + size);
+        } else if (size % 2 != 0) {
+            throw new IllegalArgumentException("The method " + 
method.getName() + " must have equal number of parameters, has: " + size);
+        }
+
+        // must not have annotations as they are not supported (yet)
+        for (int i = 0; i < size; i++) {
+            Class<?> type = parameterTypes[i];
+            if (type.getAnnotations().length > 0) {
+                throw new IllegalArgumentException("Parameter annotations at 
index " + i + " is not supported on method: " + method);
+            }
+        }
+
+        List<ParameterInfo> oldParameters = new ArrayList<ParameterInfo>();
+        List<ParameterInfo> newParameters = new ArrayList<ParameterInfo>();
+
+        for (int i = 0; i < size / 2; i++) {
+            Class<?> oldType = parameterTypes[i];
+            if (oldParameters.size() == 0) {
+                // the first parameter is the body
+                Expression oldBody = 
ExpressionBuilder.mandatoryBodyExpression(oldType);
+                ParameterInfo info = new ParameterInfo(i, oldType, null, 
oldBody);
+                oldParameters.add(info);
+            } else if (oldParameters.size() == 1) {
+                // the 2nd parameter is the headers
+                Expression oldHeaders = ExpressionBuilder.headersExpression();
+                ParameterInfo info = new ParameterInfo(i, oldType, null, 
oldHeaders);
+                oldParameters.add(info);
+            } else if (oldParameters.size() == 2) {
+                // the 3rd parameter is the properties
+                Expression oldProperties = 
ExpressionBuilder.propertiesExpression();
+                ParameterInfo info = new ParameterInfo(i, oldType, null, 
oldProperties);
+                oldParameters.add(info);
+            }
+        }
+
+        for (int i = size / 2; i < size; i++) {
+            Class<?> newType = parameterTypes[i];
+            if (newParameters.size() == 0) {
+                // the first parameter is the body
+                Expression newBody = 
ExpressionBuilder.mandatoryBodyExpression(newType);
+                ParameterInfo info = new ParameterInfo(i, newType, null, 
newBody);
+                newParameters.add(info);
+            } else if (newParameters.size() == 1) {
+                // the 2nd parameter is the headers
+                Expression newHeaders = ExpressionBuilder.headersExpression();
+                ParameterInfo info = new ParameterInfo(i, newType, null, 
newHeaders);
+                newParameters.add(info);
+            } else if (newParameters.size() == 2) {
+                // the 3rd parameter is the properties
+                Expression newProperties = 
ExpressionBuilder.propertiesExpression();
+                ParameterInfo info = new ParameterInfo(i, newType, null, 
newProperties);
+                newParameters.add(info);
+            }
+        }
+
+        return new AggregationStrategyMethodInfo(camelContext, type, method, 
oldParameters, newParameters);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/5aa3ba6c/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategyMethodInfo.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategyMethodInfo.java
 
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategyMethodInfo.java
new file mode 100644
index 0000000..ec92ca9
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategyMethodInfo.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.bean.ParameterInfo;
+
+/**
+ * Method information about the POJO method to call when using the {@link 
AggregationStrategyBeanAdapter}.
+ */
+public class AggregationStrategyMethodInfo {
+
+    private final CamelContext camelContext;
+    private final Class<?> type;
+    private final Method method;
+    private final List<ParameterInfo> oldParameters;
+    private final List<ParameterInfo> newParameters;
+
+    public AggregationStrategyMethodInfo(CamelContext camelContext, Class<?> 
type, Method method,
+                                         List<ParameterInfo> oldParameters, 
List<ParameterInfo> newParameters) {
+        this.camelContext = camelContext;
+        this.type = type;
+        this.method = method;
+        this.oldParameters = oldParameters;
+        this.newParameters = newParameters;
+    }
+
+    public Object invoke(Object pojo, Exchange oldExchange, Exchange 
newExchange) throws Exception {
+        // evaluate the parameters
+        List<Object> list = new ArrayList<Object>(oldParameters.size() + 
newParameters.size());
+        for (ParameterInfo info : oldParameters) {
+            if (oldExchange != null) {
+                Object value = info.getExpression().evaluate(oldExchange, 
info.getType());
+                list.add(value);
+            } else {
+                // use a null value as oldExchange is null
+                list.add(null);
+            }
+        }
+        for (ParameterInfo info : newParameters) {
+            if (newExchange != null) {
+                Object value = info.getExpression().evaluate(newExchange, 
info.getType());
+                list.add(value);
+            } else {
+                // use a null value as newExchange is null
+                list.add(null);
+            }
+        }
+
+        Object[] args = list.toArray();
+        return method.invoke(pojo, args);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/5aa3ba6c/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterAllowNullOldExchangeTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterAllowNullOldExchangeTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterAllowNullOldExchangeTest.java
new file mode 100644
index 0000000..668ef03
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterAllowNullOldExchangeTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
+
+public class AggregationStrategyBeanAdapterAllowNullOldExchangeTest extends 
ContextTestSupport {
+
+    private MyBodyAppender appender = new MyBodyAppender();
+    private AggregationStrategyBeanAdapter myStrategy;
+
+    public void testAggregate() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("OldWasNullABC");
+
+        template.sendBody("direct:start", "A");
+        template.sendBody("direct:start", "B");
+        template.sendBody("direct:start", "C");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                myStrategy = new AggregationStrategyBeanAdapter(appender, 
"append");
+                myStrategy.setCamelContext(getContext());
+                myStrategy.setAllowNullOldExchange(true);
+
+                from("direct:start")
+                    .aggregate(constant(true), myStrategy)
+                        .completionSize(3)
+                        .to("mock:result");
+            }
+        };
+    }
+
+    public static final class MyBodyAppender {
+
+        public String append(String existing, String next) {
+            if (existing == null) {
+                return "OldWasNull" + next;
+            }
+            if (next != null) {
+                return existing + next;
+            } else {
+                return existing;
+            }
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/5aa3ba6c/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterNonStaticMethodTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterNonStaticMethodTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterNonStaticMethodTest.java
new file mode 100644
index 0000000..abf633d
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterNonStaticMethodTest.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
+
+public class AggregationStrategyBeanAdapterNonStaticMethodTest extends 
ContextTestSupport {
+
+    private AggregationStrategyBeanAdapter myStrategy;
+
+    public void testAggregate() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("ABC");
+
+        template.sendBody("direct:start", "A");
+        template.sendBody("direct:start", "B");
+        template.sendBody("direct:start", "C");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                myStrategy = new 
AggregationStrategyBeanAdapter(MyBodyAppender.class, "append");
+                myStrategy.setCamelContext(getContext());
+
+                from("direct:start")
+                    .aggregate(constant(true), myStrategy)
+                        .completionSize(3)
+                        .to("mock:result");
+            }
+        };
+    }
+
+    public static final class MyBodyAppender {
+
+        public String append(String existing, String next) {
+            if (next != null) {
+                return existing + next;
+            } else {
+                return existing;
+            }
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/5aa3ba6c/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterOneMethodTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterOneMethodTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterOneMethodTest.java
new file mode 100644
index 0000000..f4c8576
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterOneMethodTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
+
+public class AggregationStrategyBeanAdapterOneMethodTest extends 
ContextTestSupport {
+
+    private MyBodyAppender appender = new MyBodyAppender();
+    private AggregationStrategyBeanAdapter myStrategy;
+
+    public void testAggregate() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("ABC");
+
+        template.sendBody("direct:start", "A");
+        template.sendBody("direct:start", "B");
+        template.sendBody("direct:start", "C");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                myStrategy = new AggregationStrategyBeanAdapter(appender);
+                myStrategy.setCamelContext(getContext());
+
+                from("direct:start")
+                    .aggregate(constant(true), myStrategy)
+                        .completionSize(3)
+                        .to("mock:result");
+            }
+        };
+    }
+
+    public static final class MyBodyAppender {
+
+        public String append(String existing, String next) {
+            if (next != null) {
+                return existing + next;
+            } else {
+                return existing;
+            }
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/5aa3ba6c/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterPollEnrichAllowNullNewExchangeTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterPollEnrichAllowNullNewExchangeTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterPollEnrichAllowNullNewExchangeTest.java
new file mode 100644
index 0000000..54ffabe
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterPollEnrichAllowNullNewExchangeTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
+
+public class AggregationStrategyBeanAdapterPollEnrichAllowNullNewExchangeTest 
extends ContextTestSupport {
+
+    private MyBodyAppender appender = new MyBodyAppender();
+    private AggregationStrategyBeanAdapter myStrategy;
+
+    public void testNoData() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("NewWasNullA");
+
+        template.sendBody("direct:start", "A");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                myStrategy = new AggregationStrategyBeanAdapter(appender, 
"append");
+                myStrategy.setCamelContext(getContext());
+                myStrategy.setAllowNullNewExchange(true);
+
+                from("direct:start")
+                    .pollEnrich("seda:foo", 10, myStrategy)
+                        .to("mock:result");
+            }
+        };
+    }
+
+    public static final class MyBodyAppender {
+
+        public String append(String existing, String next) {
+            if (next == null) {
+                return "NewWasNull" + existing;
+            } else {
+                return existing + next;
+            }
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/5aa3ba6c/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterPollEnrichTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterPollEnrichTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterPollEnrichTest.java
new file mode 100644
index 0000000..07deb8f
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterPollEnrichTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
+
+public class AggregationStrategyBeanAdapterPollEnrichTest extends 
ContextTestSupport {
+
+    private MyBodyAppender appender = new MyBodyAppender();
+    private AggregationStrategyBeanAdapter myStrategy;
+
+    public void testNoData() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("A");
+
+        template.sendBody("direct:start", "A");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testData() throws Exception {
+        template.sendBody("seda:foo", "B");
+
+        getMockEndpoint("mock:result").expectedBodiesReceived("AB");
+
+        template.sendBody("direct:start", "A");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                myStrategy = new AggregationStrategyBeanAdapter(appender, 
"append");
+                myStrategy.setCamelContext(getContext());
+
+                from("direct:start")
+                    .pollEnrich("seda:foo", 100, myStrategy)
+                        .to("mock:result");
+            }
+        };
+    }
+
+    public static final class MyBodyAppender {
+
+        public String append(String existing, String next) {
+            return existing + next;
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/5aa3ba6c/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterStaticMethodTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterStaticMethodTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterStaticMethodTest.java
new file mode 100644
index 0000000..16ed73c
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterStaticMethodTest.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
+
+public class AggregationStrategyBeanAdapterStaticMethodTest extends 
ContextTestSupport {
+
+    private AggregationStrategyBeanAdapter myStrategy;
+
+    public void testAggregate() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("ABC");
+
+        template.sendBody("direct:start", "A");
+        template.sendBody("direct:start", "B");
+        template.sendBody("direct:start", "C");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                myStrategy = new 
AggregationStrategyBeanAdapter(MyBodyAppender.class, "append");
+                myStrategy.setCamelContext(getContext());
+
+                from("direct:start")
+                    .aggregate(constant(true), myStrategy)
+                        .completionSize(3)
+                        .to("mock:result");
+            }
+        };
+    }
+
+    public static final class MyBodyAppender {
+
+        public static String append(String existing, String next) {
+            if (next != null) {
+                return existing + next;
+            } else {
+                return existing;
+            }
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/5aa3ba6c/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterTest.java
new file mode 100644
index 0000000..cd47f1f
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
+
+public class AggregationStrategyBeanAdapterTest extends ContextTestSupport {
+
+    private MyBodyAppender appender = new MyBodyAppender();
+    private AggregationStrategyBeanAdapter myStrategy;
+
+    public void testAggregate() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("ABC");
+
+        template.sendBody("direct:start", "A");
+        template.sendBody("direct:start", "B");
+        template.sendBody("direct:start", "C");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                myStrategy = new AggregationStrategyBeanAdapter(appender, 
"append");
+                myStrategy.setCamelContext(getContext());
+
+                from("direct:start")
+                    .aggregate(constant(true), myStrategy)
+                        .completionSize(3)
+                        .to("mock:result");
+            }
+        };
+    }
+
+    public static final class MyBodyAppender {
+
+        public String append(String existing, String next) {
+            if (next != null) {
+                return existing + next;
+            } else {
+                return existing;
+            }
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/5aa3ba6c/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterWithHeadersAndPropertiesTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterWithHeadersAndPropertiesTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterWithHeadersAndPropertiesTest.java
new file mode 100644
index 0000000..49688d5
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterWithHeadersAndPropertiesTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import java.util.Map;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
+
+public class AggregationStrategyBeanAdapterWithHeadersAndPropertiesTest 
extends ContextTestSupport {
+
+    private MyBodyAppender appender = new MyBodyAppender();
+    private AggregationStrategyBeanAdapter myStrategy;
+
+    public void testAggregate() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("ABC");
+        getMockEndpoint("mock:result").expectedHeaderReceived("foo", 
"yesyesyes");
+        getMockEndpoint("mock:result").expectedPropertyReceived("count", 6);
+
+        template.sendBodyAndProperty("direct:start", "A", "count", 1);
+        template.sendBodyAndProperty("direct:start", "B", "count", 2);
+        template.sendBodyAndProperty("direct:start", "C", "count", 3);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                myStrategy = new AggregationStrategyBeanAdapter(appender, 
"appendWithHeadersAndProperties");
+                myStrategy.setCamelContext(getContext());
+
+                from("direct:start")
+                    .setHeader("foo", constant("yes"))
+                    .aggregate(constant(true), myStrategy)
+                        .completionSize(3)
+                        .to("mock:result");
+            }
+        };
+    }
+
+    public static final class MyBodyAppender {
+
+        public String appendWithHeadersAndProperties(String existing, Map 
oldHeaders, Map oldProperties,
+                                                     String next, Map 
newHeaders, Map newProperties) {
+            if (next != null) {
+                Integer count = (Integer) oldProperties.get("count") + 
(Integer) newProperties.get("count");
+                oldProperties.put("count", count);
+                String foo = oldHeaders.get("foo") + (String) 
newHeaders.get("foo");
+                oldHeaders.put("foo", foo);
+                return existing + next;
+            } else {
+                return existing;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/5aa3ba6c/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterWithHeadersTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterWithHeadersTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterWithHeadersTest.java
new file mode 100644
index 0000000..4230050
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterWithHeadersTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import java.util.Map;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
+
+public class AggregationStrategyBeanAdapterWithHeadersTest extends 
ContextTestSupport {
+
+    private MyBodyAppender appender = new MyBodyAppender();
+    private AggregationStrategyBeanAdapter myStrategy;
+
+    public void testAggregate() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("ABC");
+        getMockEndpoint("mock:result").expectedHeaderReceived("count", 6);
+
+        template.sendBodyAndHeader("direct:start", "A", "count", 1);
+        template.sendBodyAndHeader("direct:start", "B", "count", 2);
+        template.sendBodyAndHeader("direct:start", "C", "count", 3);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                myStrategy = new AggregationStrategyBeanAdapter(appender, 
"appendWithHeaders");
+                myStrategy.setCamelContext(getContext());
+
+                from("direct:start")
+                    .aggregate(constant(true), myStrategy)
+                        .completionSize(3)
+                        .to("mock:result");
+            }
+        };
+    }
+
+    public static final class MyBodyAppender {
+
+        public String appendWithHeaders(String existing, Map oldHeaders, 
String next, Map newHeaders) {
+            if (next != null) {
+                Integer count = (Integer) oldHeaders.get("count") + (Integer) 
newHeaders.get("count");
+                oldHeaders.put("count", count);
+                return existing + next;
+            } else {
+                return existing;
+            }
+        }
+    }
+}

Reply via email to