CAMEL-6650: AggregationStrategy - Allow to use a pojo with no Camel API dependencies. Work in progress.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5aa3ba6c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5aa3ba6c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5aa3ba6c Branch: refs/heads/master Commit: 5aa3ba6cce76755c0b4b0b0bd9ef7c34aa1de694 Parents: ee1fd85 Author: Claus Ibsen <davscl...@apache.org> Authored: Mon Aug 19 09:08:22 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Aug 20 08:29:57 2013 +0200 ---------------------------------------------------------------------- .../AggregationStrategyBeanAdapter.java | 241 +++++++++++++++++++ .../aggregate/AggregationStrategyBeanInfo.java | 118 +++++++++ .../AggregationStrategyMethodInfo.java | 72 ++++++ ...tegyBeanAdapterAllowNullOldExchangeTest.java | 69 ++++++ ...nStrategyBeanAdapterNonStaticMethodTest.java | 64 +++++ ...egationStrategyBeanAdapterOneMethodTest.java | 65 +++++ ...apterPollEnrichAllowNullNewExchangeTest.java | 63 +++++ ...gationStrategyBeanAdapterPollEnrichTest.java | 68 ++++++ ...tionStrategyBeanAdapterStaticMethodTest.java | 64 +++++ .../AggregationStrategyBeanAdapterTest.java | 65 +++++ ...BeanAdapterWithHeadersAndPropertiesTest.java | 74 ++++++ ...ationStrategyBeanAdapterWithHeadersTest.java | 69 ++++++ 12 files changed, 1032 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/5aa3ba6c/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategyBeanAdapter.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategyBeanAdapter.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategyBeanAdapter.java new file mode 100644 index 0000000..21d8199 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategyBeanAdapter.java @@ -0,0 +1,241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregate; + +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.lang.reflect.Proxy; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.Exchange; +import org.apache.camel.support.ServiceSupport; +import org.apache.camel.util.ServiceHelper; + +/** + * An {@link AggregationStrategy} that adapts to a POJO. + * <p/> + * This allows end users to use POJOs for the aggregation logic, instead of having to implement the + * Camel API {@link AggregationStrategy}. + */ +public final class AggregationStrategyBeanAdapter extends ServiceSupport implements AggregationStrategy, CamelContextAware { + + // TODO: Add parameter bindings for: + // - @Header / @Property / @XPath etc + // - CamelContext, Registry, etc. + // TODO: Add DSL support in Java DSL + // TODO: Add DSL support in XML DSL + + private static final List<Method> EXCLUDED_METHODS = new ArrayList<Method>(); + private CamelContext camelContext; + private Object pojo; + private final Class<?> type; + private String methodName; + private boolean allowNullOldExchange = false; + private boolean allowNullNewExchange = false; + private volatile AggregationStrategyMethodInfo mi; + + static { + // exclude all java.lang.Object methods as we dont want to invoke them + EXCLUDED_METHODS.addAll(Arrays.asList(Object.class.getMethods())); + // exclude all java.lang.reflect.Proxy methods as we dont want to invoke them + EXCLUDED_METHODS.addAll(Arrays.asList(Proxy.class.getMethods())); + } + + /** + * Creates this adapter. + * + * @param pojo the pojo to use. + */ + public AggregationStrategyBeanAdapter(Object pojo) { + this(pojo, null); + } + + /** + * Creates this adapter. + * + * @param type the class type of the pojo + */ + public AggregationStrategyBeanAdapter(Class<?> type) { + this(type, null); + } + + /** + * Creates this adapter. + * + * @param pojo the pojo to use. + * @param methodName the name of the method to call + */ + public AggregationStrategyBeanAdapter(Object pojo, String methodName) { + this.pojo = pojo; + this.type = pojo.getClass(); + this.methodName = methodName; + } + + /** + * Creates this adapter. + * + * @param type the class type of the pojo + * @param methodName the name of the method to call + */ + public AggregationStrategyBeanAdapter(Class<?> type, String methodName) { + this.type = type; + this.pojo = null; + this.methodName = methodName; + } + + public CamelContext getCamelContext() { + return camelContext; + } + + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + public String getMethodName() { + return methodName; + } + + public void setMethodName(String methodName) { + this.methodName = methodName; + } + + public boolean isAllowNullOldExchange() { + return allowNullOldExchange; + } + + public void setAllowNullOldExchange(boolean allowNullOldExchange) { + this.allowNullOldExchange = allowNullOldExchange; + } + + public boolean isAllowNullNewExchange() { + return allowNullNewExchange; + } + + public void setAllowNullNewExchange(boolean allowNullNewExchange) { + this.allowNullNewExchange = allowNullNewExchange; + } + + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + if (!allowNullOldExchange && oldExchange == null) { + return newExchange; + } + if (!allowNullNewExchange && newExchange == null) { + return oldExchange; + } + + try { + Object out = mi.invoke(pojo, oldExchange, newExchange); + if (out != null) { + if (oldExchange != null) { + oldExchange.getIn().setBody(out); + } else { + newExchange.getIn().setBody(out); + } + } + } catch (Exception e) { + if (oldExchange != null) { + oldExchange.setException(e); + } else { + newExchange.setException(e); + } + } + return oldExchange != null ? oldExchange : newExchange; + } + + /** + * Validates whether the given method is valid. + * + * @param method the method + * @return true if valid, false to skip the method + */ + protected boolean isValidMethod(Method method) { + // must not be in the excluded list + for (Method excluded : EXCLUDED_METHODS) { + if (method.equals(excluded)) { + return false; + } + } + + // must be a public method + if (!Modifier.isPublic(method.getModifiers())) { + return false; + } + + // return type must not be void and it should not be a bridge method + if (method.getReturnType().equals(Void.TYPE) || method.isBridge()) { + return false; + } + + return true; + } + + private static boolean isStaticMethod(Method method) { + return Modifier.isStatic(method.getModifiers()); + } + + @Override + protected void doStart() throws Exception { + Method found = null; + if (methodName != null) { + for (Method method : type.getMethods()) { + if (isValidMethod(method) && method.getName().equals(methodName)) { + if (found == null) { + found = method; + } else { + throw new IllegalArgumentException("The bean " + type + " has 2 or more methods with the name " + methodName); + } + } + } + } else { + for (Method method : type.getMethods()) { + if (isValidMethod(method)) { + if (found == null) { + found = method; + } else { + throw new IllegalArgumentException("The bean " + type + " has 2 or more methods and no explicit method name was configured."); + } + } + } + } + + if (found == null) { + throw new UnsupportedOperationException("Cannot find a valid method with name: " + methodName + " on bean type: " + type); + } + + // if its not a static method then we must have an instance of the pojo + if (!isStaticMethod(found) && pojo == null) { + pojo = camelContext.getInjector().newInstance(type); + } + + // create the method info which has adapted to the pojo + AggregationStrategyBeanInfo bi = new AggregationStrategyBeanInfo(getCamelContext(), type, found); + mi = bi.createMethodInfo(); + + // in case the pojo is a service + ServiceHelper.startService(pojo); + } + + @Override + protected void doStop() throws Exception { + ServiceHelper.stopService(pojo); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/5aa3ba6c/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategyBeanInfo.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategyBeanInfo.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategyBeanInfo.java new file mode 100644 index 0000000..f898be7 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategyBeanInfo.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregate; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; + +import org.apache.camel.CamelContext; +import org.apache.camel.Expression; +import org.apache.camel.builder.ExpressionBuilder; +import org.apache.camel.component.bean.ParameterInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class information about the POJO method to call when using the {@link AggregationStrategyBeanAdapter}. + */ +public class AggregationStrategyBeanInfo { + + // TODO: We could potential merge this logic into AggregationStrategyMethodInfo and only have 1 class + + private static final Logger LOG = LoggerFactory.getLogger(AggregationStrategyBeanInfo.class); + + private final CamelContext camelContext; + private final Class<?> type; + private final Method method; + + public AggregationStrategyBeanInfo(CamelContext camelContext, Class<?> type, Method method) { + this.camelContext = camelContext; + this.type = type; + this.method = method; + } + + protected AggregationStrategyMethodInfo createMethodInfo() { + Class<?>[] parameterTypes = method.getParameterTypes(); + + int size = parameterTypes.length; + if (LOG.isTraceEnabled()) { + LOG.trace("Creating MethodInfo for class: {} method: {} having {} parameters", new Object[]{type, method, size}); + } + + // must have equal number of parameters + if (size < 2) { + throw new IllegalArgumentException("The method " + method.getName() + " must have at least two parameters, has: " + size); + } else if (size % 2 != 0) { + throw new IllegalArgumentException("The method " + method.getName() + " must have equal number of parameters, has: " + size); + } + + // must not have annotations as they are not supported (yet) + for (int i = 0; i < size; i++) { + Class<?> type = parameterTypes[i]; + if (type.getAnnotations().length > 0) { + throw new IllegalArgumentException("Parameter annotations at index " + i + " is not supported on method: " + method); + } + } + + List<ParameterInfo> oldParameters = new ArrayList<ParameterInfo>(); + List<ParameterInfo> newParameters = new ArrayList<ParameterInfo>(); + + for (int i = 0; i < size / 2; i++) { + Class<?> oldType = parameterTypes[i]; + if (oldParameters.size() == 0) { + // the first parameter is the body + Expression oldBody = ExpressionBuilder.mandatoryBodyExpression(oldType); + ParameterInfo info = new ParameterInfo(i, oldType, null, oldBody); + oldParameters.add(info); + } else if (oldParameters.size() == 1) { + // the 2nd parameter is the headers + Expression oldHeaders = ExpressionBuilder.headersExpression(); + ParameterInfo info = new ParameterInfo(i, oldType, null, oldHeaders); + oldParameters.add(info); + } else if (oldParameters.size() == 2) { + // the 3rd parameter is the properties + Expression oldProperties = ExpressionBuilder.propertiesExpression(); + ParameterInfo info = new ParameterInfo(i, oldType, null, oldProperties); + oldParameters.add(info); + } + } + + for (int i = size / 2; i < size; i++) { + Class<?> newType = parameterTypes[i]; + if (newParameters.size() == 0) { + // the first parameter is the body + Expression newBody = ExpressionBuilder.mandatoryBodyExpression(newType); + ParameterInfo info = new ParameterInfo(i, newType, null, newBody); + newParameters.add(info); + } else if (newParameters.size() == 1) { + // the 2nd parameter is the headers + Expression newHeaders = ExpressionBuilder.headersExpression(); + ParameterInfo info = new ParameterInfo(i, newType, null, newHeaders); + newParameters.add(info); + } else if (newParameters.size() == 2) { + // the 3rd parameter is the properties + Expression newProperties = ExpressionBuilder.propertiesExpression(); + ParameterInfo info = new ParameterInfo(i, newType, null, newProperties); + newParameters.add(info); + } + } + + return new AggregationStrategyMethodInfo(camelContext, type, method, oldParameters, newParameters); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/5aa3ba6c/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategyMethodInfo.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategyMethodInfo.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategyMethodInfo.java new file mode 100644 index 0000000..ec92ca9 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategyMethodInfo.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregate; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; + +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.component.bean.ParameterInfo; + +/** + * Method information about the POJO method to call when using the {@link AggregationStrategyBeanAdapter}. + */ +public class AggregationStrategyMethodInfo { + + private final CamelContext camelContext; + private final Class<?> type; + private final Method method; + private final List<ParameterInfo> oldParameters; + private final List<ParameterInfo> newParameters; + + public AggregationStrategyMethodInfo(CamelContext camelContext, Class<?> type, Method method, + List<ParameterInfo> oldParameters, List<ParameterInfo> newParameters) { + this.camelContext = camelContext; + this.type = type; + this.method = method; + this.oldParameters = oldParameters; + this.newParameters = newParameters; + } + + public Object invoke(Object pojo, Exchange oldExchange, Exchange newExchange) throws Exception { + // evaluate the parameters + List<Object> list = new ArrayList<Object>(oldParameters.size() + newParameters.size()); + for (ParameterInfo info : oldParameters) { + if (oldExchange != null) { + Object value = info.getExpression().evaluate(oldExchange, info.getType()); + list.add(value); + } else { + // use a null value as oldExchange is null + list.add(null); + } + } + for (ParameterInfo info : newParameters) { + if (newExchange != null) { + Object value = info.getExpression().evaluate(newExchange, info.getType()); + list.add(value); + } else { + // use a null value as newExchange is null + list.add(null); + } + } + + Object[] args = list.toArray(); + return method.invoke(pojo, args); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/5aa3ba6c/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterAllowNullOldExchangeTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterAllowNullOldExchangeTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterAllowNullOldExchangeTest.java new file mode 100644 index 0000000..668ef03 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterAllowNullOldExchangeTest.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter; + +public class AggregationStrategyBeanAdapterAllowNullOldExchangeTest extends ContextTestSupport { + + private MyBodyAppender appender = new MyBodyAppender(); + private AggregationStrategyBeanAdapter myStrategy; + + public void testAggregate() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("OldWasNullABC"); + + template.sendBody("direct:start", "A"); + template.sendBody("direct:start", "B"); + template.sendBody("direct:start", "C"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + myStrategy = new AggregationStrategyBeanAdapter(appender, "append"); + myStrategy.setCamelContext(getContext()); + myStrategy.setAllowNullOldExchange(true); + + from("direct:start") + .aggregate(constant(true), myStrategy) + .completionSize(3) + .to("mock:result"); + } + }; + } + + public static final class MyBodyAppender { + + public String append(String existing, String next) { + if (existing == null) { + return "OldWasNull" + next; + } + if (next != null) { + return existing + next; + } else { + return existing; + } + } + + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/5aa3ba6c/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterNonStaticMethodTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterNonStaticMethodTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterNonStaticMethodTest.java new file mode 100644 index 0000000..abf633d --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterNonStaticMethodTest.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter; + +public class AggregationStrategyBeanAdapterNonStaticMethodTest extends ContextTestSupport { + + private AggregationStrategyBeanAdapter myStrategy; + + public void testAggregate() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("ABC"); + + template.sendBody("direct:start", "A"); + template.sendBody("direct:start", "B"); + template.sendBody("direct:start", "C"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + myStrategy = new AggregationStrategyBeanAdapter(MyBodyAppender.class, "append"); + myStrategy.setCamelContext(getContext()); + + from("direct:start") + .aggregate(constant(true), myStrategy) + .completionSize(3) + .to("mock:result"); + } + }; + } + + public static final class MyBodyAppender { + + public String append(String existing, String next) { + if (next != null) { + return existing + next; + } else { + return existing; + } + } + + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/5aa3ba6c/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterOneMethodTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterOneMethodTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterOneMethodTest.java new file mode 100644 index 0000000..f4c8576 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterOneMethodTest.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter; + +public class AggregationStrategyBeanAdapterOneMethodTest extends ContextTestSupport { + + private MyBodyAppender appender = new MyBodyAppender(); + private AggregationStrategyBeanAdapter myStrategy; + + public void testAggregate() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("ABC"); + + template.sendBody("direct:start", "A"); + template.sendBody("direct:start", "B"); + template.sendBody("direct:start", "C"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + myStrategy = new AggregationStrategyBeanAdapter(appender); + myStrategy.setCamelContext(getContext()); + + from("direct:start") + .aggregate(constant(true), myStrategy) + .completionSize(3) + .to("mock:result"); + } + }; + } + + public static final class MyBodyAppender { + + public String append(String existing, String next) { + if (next != null) { + return existing + next; + } else { + return existing; + } + } + + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/5aa3ba6c/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterPollEnrichAllowNullNewExchangeTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterPollEnrichAllowNullNewExchangeTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterPollEnrichAllowNullNewExchangeTest.java new file mode 100644 index 0000000..54ffabe --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterPollEnrichAllowNullNewExchangeTest.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter; + +public class AggregationStrategyBeanAdapterPollEnrichAllowNullNewExchangeTest extends ContextTestSupport { + + private MyBodyAppender appender = new MyBodyAppender(); + private AggregationStrategyBeanAdapter myStrategy; + + public void testNoData() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("NewWasNullA"); + + template.sendBody("direct:start", "A"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + myStrategy = new AggregationStrategyBeanAdapter(appender, "append"); + myStrategy.setCamelContext(getContext()); + myStrategy.setAllowNullNewExchange(true); + + from("direct:start") + .pollEnrich("seda:foo", 10, myStrategy) + .to("mock:result"); + } + }; + } + + public static final class MyBodyAppender { + + public String append(String existing, String next) { + if (next == null) { + return "NewWasNull" + existing; + } else { + return existing + next; + } + } + + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/5aa3ba6c/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterPollEnrichTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterPollEnrichTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterPollEnrichTest.java new file mode 100644 index 0000000..07deb8f --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterPollEnrichTest.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter; + +public class AggregationStrategyBeanAdapterPollEnrichTest extends ContextTestSupport { + + private MyBodyAppender appender = new MyBodyAppender(); + private AggregationStrategyBeanAdapter myStrategy; + + public void testNoData() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("A"); + + template.sendBody("direct:start", "A"); + + assertMockEndpointsSatisfied(); + } + + public void testData() throws Exception { + template.sendBody("seda:foo", "B"); + + getMockEndpoint("mock:result").expectedBodiesReceived("AB"); + + template.sendBody("direct:start", "A"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + myStrategy = new AggregationStrategyBeanAdapter(appender, "append"); + myStrategy.setCamelContext(getContext()); + + from("direct:start") + .pollEnrich("seda:foo", 100, myStrategy) + .to("mock:result"); + } + }; + } + + public static final class MyBodyAppender { + + public String append(String existing, String next) { + return existing + next; + } + + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/5aa3ba6c/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterStaticMethodTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterStaticMethodTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterStaticMethodTest.java new file mode 100644 index 0000000..16ed73c --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterStaticMethodTest.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter; + +public class AggregationStrategyBeanAdapterStaticMethodTest extends ContextTestSupport { + + private AggregationStrategyBeanAdapter myStrategy; + + public void testAggregate() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("ABC"); + + template.sendBody("direct:start", "A"); + template.sendBody("direct:start", "B"); + template.sendBody("direct:start", "C"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + myStrategy = new AggregationStrategyBeanAdapter(MyBodyAppender.class, "append"); + myStrategy.setCamelContext(getContext()); + + from("direct:start") + .aggregate(constant(true), myStrategy) + .completionSize(3) + .to("mock:result"); + } + }; + } + + public static final class MyBodyAppender { + + public static String append(String existing, String next) { + if (next != null) { + return existing + next; + } else { + return existing; + } + } + + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/5aa3ba6c/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterTest.java new file mode 100644 index 0000000..cd47f1f --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterTest.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter; + +public class AggregationStrategyBeanAdapterTest extends ContextTestSupport { + + private MyBodyAppender appender = new MyBodyAppender(); + private AggregationStrategyBeanAdapter myStrategy; + + public void testAggregate() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("ABC"); + + template.sendBody("direct:start", "A"); + template.sendBody("direct:start", "B"); + template.sendBody("direct:start", "C"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + myStrategy = new AggregationStrategyBeanAdapter(appender, "append"); + myStrategy.setCamelContext(getContext()); + + from("direct:start") + .aggregate(constant(true), myStrategy) + .completionSize(3) + .to("mock:result"); + } + }; + } + + public static final class MyBodyAppender { + + public String append(String existing, String next) { + if (next != null) { + return existing + next; + } else { + return existing; + } + } + + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/5aa3ba6c/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterWithHeadersAndPropertiesTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterWithHeadersAndPropertiesTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterWithHeadersAndPropertiesTest.java new file mode 100644 index 0000000..49688d5 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterWithHeadersAndPropertiesTest.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import java.util.Map; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter; + +public class AggregationStrategyBeanAdapterWithHeadersAndPropertiesTest extends ContextTestSupport { + + private MyBodyAppender appender = new MyBodyAppender(); + private AggregationStrategyBeanAdapter myStrategy; + + public void testAggregate() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("ABC"); + getMockEndpoint("mock:result").expectedHeaderReceived("foo", "yesyesyes"); + getMockEndpoint("mock:result").expectedPropertyReceived("count", 6); + + template.sendBodyAndProperty("direct:start", "A", "count", 1); + template.sendBodyAndProperty("direct:start", "B", "count", 2); + template.sendBodyAndProperty("direct:start", "C", "count", 3); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + myStrategy = new AggregationStrategyBeanAdapter(appender, "appendWithHeadersAndProperties"); + myStrategy.setCamelContext(getContext()); + + from("direct:start") + .setHeader("foo", constant("yes")) + .aggregate(constant(true), myStrategy) + .completionSize(3) + .to("mock:result"); + } + }; + } + + public static final class MyBodyAppender { + + public String appendWithHeadersAndProperties(String existing, Map oldHeaders, Map oldProperties, + String next, Map newHeaders, Map newProperties) { + if (next != null) { + Integer count = (Integer) oldProperties.get("count") + (Integer) newProperties.get("count"); + oldProperties.put("count", count); + String foo = oldHeaders.get("foo") + (String) newHeaders.get("foo"); + oldHeaders.put("foo", foo); + return existing + next; + } else { + return existing; + } + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/5aa3ba6c/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterWithHeadersTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterWithHeadersTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterWithHeadersTest.java new file mode 100644 index 0000000..4230050 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyBeanAdapterWithHeadersTest.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import java.util.Map; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter; + +public class AggregationStrategyBeanAdapterWithHeadersTest extends ContextTestSupport { + + private MyBodyAppender appender = new MyBodyAppender(); + private AggregationStrategyBeanAdapter myStrategy; + + public void testAggregate() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("ABC"); + getMockEndpoint("mock:result").expectedHeaderReceived("count", 6); + + template.sendBodyAndHeader("direct:start", "A", "count", 1); + template.sendBodyAndHeader("direct:start", "B", "count", 2); + template.sendBodyAndHeader("direct:start", "C", "count", 3); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + myStrategy = new AggregationStrategyBeanAdapter(appender, "appendWithHeaders"); + myStrategy.setCamelContext(getContext()); + + from("direct:start") + .aggregate(constant(true), myStrategy) + .completionSize(3) + .to("mock:result"); + } + }; + } + + public static final class MyBodyAppender { + + public String appendWithHeaders(String existing, Map oldHeaders, String next, Map newHeaders) { + if (next != null) { + Integer count = (Integer) oldHeaders.get("count") + (Integer) newHeaders.get("count"); + oldHeaders.put("count", count); + return existing + next; + } else { + return existing; + } + } + } +}