Repository: camel Updated Branches: refs/heads/master 28128d9f3 -> d2ea9b510
CAMEL-8966: Lets use better name Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d2ea9b51 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d2ea9b51 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d2ea9b51 Branch: refs/heads/master Commit: d2ea9b5102630d7b80c95b616acbad4b1ead1bfb Parents: 28128d9 Author: Claus Ibsen <davscl...@apache.org> Authored: Mon Jul 20 14:42:36 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon Jul 20 14:42:36 2015 +0200 ---------------------------------------------------------------------- .../mbean/ManagedDynamicSendProcessorMBean.java | 35 --- .../mbean/ManagedSendDynamicProcessorMBean.java | 35 +++ .../DefaultManagementObjectStrategy.java | 8 +- .../mbean/ManagedDynamicSendProcessor.java | 74 ------- .../mbean/ManagedSendDynamicProcessor.java | 74 +++++++ .../apache/camel/model/DynamicToDefinition.java | 187 ---------------- .../apache/camel/model/ProcessorDefinition.java | 4 +- .../apache/camel/model/ToDynamicDefinition.java | 187 ++++++++++++++++ .../camel/processor/DynamicSendProcessor.java | 214 ------------------- .../camel/processor/SendDynamicProcessor.java | 214 +++++++++++++++++++ .../apache/camel/processor/SendProcessor.java | 2 +- .../resources/org/apache/camel/model/jaxb.index | 2 +- .../ManagedDynamicSendProcessorTest.java | 95 -------- .../ManagedSendDynamicProcessorTest.java | 95 ++++++++ 14 files changed, 613 insertions(+), 613 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/d2ea9b51/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedDynamicSendProcessorMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedDynamicSendProcessorMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedDynamicSendProcessorMBean.java deleted file mode 100644 index 7482cb8..0000000 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedDynamicSendProcessorMBean.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.api.management.mbean; - -import org.apache.camel.api.management.ManagedAttribute; - -public interface ManagedDynamicSendProcessorMBean extends ManagedProcessorMBean { - - @ManagedAttribute(description = "The uri of the endpoint to send to. The uri can be dynamic computed using the expressions.", mask = true) - String getUri(); - - @ManagedAttribute(description = "Message Exchange Pattern") - String getMessageExchangePattern(); - - @ManagedAttribute(description = "Sets the maximum size used by the ProducerCacheN which is used to cache and reuse producers.") - Integer getCacheSize(); - - @ManagedAttribute(description = "Ignore the invalidate endpoint exception when try to create a producer with that endpoint") - Boolean isIgnoreInvalidEndpoint(); - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/d2ea9b51/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSendDynamicProcessorMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSendDynamicProcessorMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSendDynamicProcessorMBean.java new file mode 100644 index 0000000..e1071ceb --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSendDynamicProcessorMBean.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.api.management.mbean; + +import org.apache.camel.api.management.ManagedAttribute; + +public interface ManagedSendDynamicProcessorMBean extends ManagedProcessorMBean { + + @ManagedAttribute(description = "The uri of the endpoint to send to. The uri can be dynamic computed using the expressions.", mask = true) + String getUri(); + + @ManagedAttribute(description = "Message Exchange Pattern") + String getMessageExchangePattern(); + + @ManagedAttribute(description = "Sets the maximum size used by the ProducerCacheN which is used to cache and reuse producers.") + Integer getCacheSize(); + + @ManagedAttribute(description = "Ignore the invalidate endpoint exception when try to create a producer with that endpoint") + Boolean isIgnoreInvalidEndpoint(); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/d2ea9b51/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java index 3832d1c..a8df7a9 100644 --- a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java @@ -38,7 +38,7 @@ import org.apache.camel.management.mbean.ManagedCamelContext; import org.apache.camel.management.mbean.ManagedComponent; import org.apache.camel.management.mbean.ManagedConsumer; import org.apache.camel.management.mbean.ManagedDelayer; -import org.apache.camel.management.mbean.ManagedDynamicSendProcessor; +import org.apache.camel.management.mbean.ManagedSendDynamicProcessor; import org.apache.camel.management.mbean.ManagedEndpoint; import org.apache.camel.management.mbean.ManagedErrorHandler; import org.apache.camel.management.mbean.ManagedEventNotifier; @@ -56,7 +56,7 @@ import org.apache.camel.management.mbean.ManagedThroughputLogger; import org.apache.camel.model.ModelCamelContext; import org.apache.camel.model.ProcessorDefinition; import org.apache.camel.processor.Delayer; -import org.apache.camel.processor.DynamicSendProcessor; +import org.apache.camel.processor.SendDynamicProcessor; import org.apache.camel.processor.ErrorHandler; import org.apache.camel.processor.SendProcessor; import org.apache.camel.processor.Throttler; @@ -183,8 +183,8 @@ public class DefaultManagementObjectStrategy implements ManagementObjectStrategy answer = new ManagedDelayer(context, (Delayer) target, definition); } else if (target instanceof Throttler) { answer = new ManagedThrottler(context, (Throttler) target, definition); - } else if (target instanceof DynamicSendProcessor) { - answer = new ManagedDynamicSendProcessor(context, (DynamicSendProcessor) target, definition); + } else if (target instanceof SendDynamicProcessor) { + answer = new ManagedSendDynamicProcessor(context, (SendDynamicProcessor) target, definition); } else if (target instanceof SendProcessor) { SendProcessor sp = (SendProcessor) target; // special for sending to throughput logger http://git-wip-us.apache.org/repos/asf/camel/blob/d2ea9b51/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedDynamicSendProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedDynamicSendProcessor.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedDynamicSendProcessor.java deleted file mode 100644 index 7b32007..0000000 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedDynamicSendProcessor.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.management.mbean; - -import org.apache.camel.CamelContext; -import org.apache.camel.api.management.ManagedResource; -import org.apache.camel.api.management.mbean.ManagedDynamicSendProcessorMBean; -import org.apache.camel.model.ProcessorDefinition; -import org.apache.camel.processor.DynamicSendProcessor; -import org.apache.camel.spi.ManagementStrategy; -import org.apache.camel.util.URISupport; - -/** - * @version - */ -@ManagedResource(description = "Managed DynamicSendProcessor") -public class ManagedDynamicSendProcessor extends ManagedProcessor implements ManagedDynamicSendProcessorMBean { - private final DynamicSendProcessor processor; - private String uri; - - public ManagedDynamicSendProcessor(CamelContext context, DynamicSendProcessor processor, ProcessorDefinition<?> definition) { - super(context, processor, definition); - this.processor = processor; - } - - public void init(ManagementStrategy strategy) { - super.init(strategy); - boolean sanitize = strategy.getManagementAgent().getMask() != null ? strategy.getManagementAgent().getMask() : false; - if (sanitize) { - uri = URISupport.sanitizeUri(processor.getUri()); - } else { - uri = processor.getUri(); - } - } - - public DynamicSendProcessor getProcessor() { - return processor; - } - - public String getUri() { - return uri; - } - - public String getMessageExchangePattern() { - if (processor.getPattern() != null) { - return processor.getPattern().name(); - } else { - return null; - } - } - - public Integer getCacheSize() { - return processor.getCacheSize(); - } - - public Boolean isIgnoreInvalidEndpoint() { - return processor.isIgnoreInvalidEndpoint(); - } - -} http://git-wip-us.apache.org/repos/asf/camel/blob/d2ea9b51/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedSendDynamicProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedSendDynamicProcessor.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedSendDynamicProcessor.java new file mode 100644 index 0000000..21a4207 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedSendDynamicProcessor.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management.mbean; + +import org.apache.camel.CamelContext; +import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.api.management.mbean.ManagedSendDynamicProcessorMBean; +import org.apache.camel.model.ProcessorDefinition; +import org.apache.camel.processor.SendDynamicProcessor; +import org.apache.camel.spi.ManagementStrategy; +import org.apache.camel.util.URISupport; + +/** + * @version + */ +@ManagedResource(description = "Managed SendDynamicProcessor") +public class ManagedSendDynamicProcessor extends ManagedProcessor implements ManagedSendDynamicProcessorMBean { + private final SendDynamicProcessor processor; + private String uri; + + public ManagedSendDynamicProcessor(CamelContext context, SendDynamicProcessor processor, ProcessorDefinition<?> definition) { + super(context, processor, definition); + this.processor = processor; + } + + public void init(ManagementStrategy strategy) { + super.init(strategy); + boolean sanitize = strategy.getManagementAgent().getMask() != null ? strategy.getManagementAgent().getMask() : false; + if (sanitize) { + uri = URISupport.sanitizeUri(processor.getUri()); + } else { + uri = processor.getUri(); + } + } + + public SendDynamicProcessor getProcessor() { + return processor; + } + + public String getUri() { + return uri; + } + + public String getMessageExchangePattern() { + if (processor.getPattern() != null) { + return processor.getPattern().name(); + } else { + return null; + } + } + + public Integer getCacheSize() { + return processor.getCacheSize(); + } + + public Boolean isIgnoreInvalidEndpoint() { + return processor.isIgnoreInvalidEndpoint(); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/d2ea9b51/camel-core/src/main/java/org/apache/camel/model/DynamicToDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/DynamicToDefinition.java b/camel-core/src/main/java/org/apache/camel/model/DynamicToDefinition.java deleted file mode 100644 index cd264a2..0000000 --- a/camel-core/src/main/java/org/apache/camel/model/DynamicToDefinition.java +++ /dev/null @@ -1,187 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.model; - -import java.util.ArrayList; -import java.util.List; -import javax.xml.bind.annotation.XmlAccessType; -import javax.xml.bind.annotation.XmlAccessorType; -import javax.xml.bind.annotation.XmlAttribute; -import javax.xml.bind.annotation.XmlRootElement; - -import org.apache.camel.ExchangePattern; -import org.apache.camel.Expression; -import org.apache.camel.NoSuchLanguageException; -import org.apache.camel.Processor; -import org.apache.camel.builder.ExpressionBuilder; -import org.apache.camel.processor.DynamicSendProcessor; -import org.apache.camel.spi.Language; -import org.apache.camel.spi.Metadata; -import org.apache.camel.spi.RouteContext; -import org.apache.camel.util.ObjectHelper; - -/** - * Sends the message to a dynamic endpoint (uri supports languages) - * <p/> - * You can specify multiple languages in the uri separated by the plus sign, such as <tt>mock:+xpath:/order/@uri</tt> - * where <tt>mock:</tt> would be a prefix to a xpath expression. - * <p/> - * For more dynamic behavior use <a href="http://camel.apache.org/recipient-list.html">Recipient List</a> or - * <a href="http://camel.apache.org/dynamic-router.html">Dynamic Router</a> EIP instead. - */ -@Metadata(label = "eip,endpoint,routing") -@XmlRootElement(name = "toD") -@XmlAccessorType(XmlAccessType.FIELD) -public class DynamicToDefinition extends NoOutputDefinition<DynamicToDefinition> { - @XmlAttribute @Metadata(required = "true") - private String uri; - @XmlAttribute - private ExchangePattern pattern; - @XmlAttribute - private Integer cacheSize; - @XmlAttribute - private Boolean ignoreInvalidEndpoint; - - public DynamicToDefinition() { - } - - public DynamicToDefinition(String uri) { - this.uri = uri; - } - - @Override - public Processor createProcessor(RouteContext routeContext) throws Exception { - ObjectHelper.notEmpty(uri, "uri", this); - - List<Expression> list = new ArrayList<Expression>(); - String[] parts = uri.split("\\+"); - for (String part : parts) { - // the part may have optional language to use, so you can mix languages - String before = ObjectHelper.before(part, ":"); - String after = ObjectHelper.after(part, ":"); - if (before != null && after != null) { - // maybe its a language - try { - Language partLanguage = routeContext.getCamelContext().resolveLanguage(before); - Expression exp = partLanguage.createExpression(after); - list.add(exp); - continue; - } catch (NoSuchLanguageException e) { - // ignore - } - } - // fallback and use simple language - Language lan = routeContext.getCamelContext().resolveLanguage("simple"); - Expression exp = lan.createExpression(part); - list.add(exp); - } - - Expression exp; - if (list.size() == 1) { - exp = list.get(0); - } else { - exp = ExpressionBuilder.concatExpression(list); - } - - DynamicSendProcessor processor = new DynamicSendProcessor(uri, exp); - processor.setPattern(pattern); - if (cacheSize != null) { - processor.setCacheSize(cacheSize); - } - if (ignoreInvalidEndpoint != null) { - processor.setIgnoreInvalidEndpoint(ignoreInvalidEndpoint); - } - return processor; - } - - @Override - public String toString() { - return "DynamicTo[" + getLabel() + "]"; - } - - // Fluent API - // ------------------------------------------------------------------------- - - /** - * Sets the optional {@link ExchangePattern} used to invoke this endpoint - */ - public DynamicToDefinition pattern(ExchangePattern pattern) { - setPattern(pattern); - return this; - } - - /** - * Sets the maximum size used by the {@link org.apache.camel.impl.ConsumerCache} which is used to cache and reuse producers. - * - * @param cacheSize the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off. - * @return the builder - */ - public DynamicToDefinition cacheSize(int cacheSize) { - setCacheSize(cacheSize); - return this; - } - - /** - * Ignore the invalidate endpoint exception when try to create a producer with that endpoint - * - * @return the builder - */ - public DynamicToDefinition ignoreInvalidEndpoint() { - setIgnoreInvalidEndpoint(true); - return this; - } - - // Properties - // ------------------------------------------------------------------------- - - public String getUri() { - return uri; - } - - /** - * The uri of the endpoint to send to. The uri can be dynamic computed using the {@link org.apache.camel.language.simple.SimpleLanguage} expression. - */ - public void setUri(String uri) { - this.uri = uri; - } - - public ExchangePattern getPattern() { - return pattern; - } - - public void setPattern(ExchangePattern pattern) { - this.pattern = pattern; - } - - public Integer getCacheSize() { - return cacheSize; - } - - public void setCacheSize(Integer cacheSize) { - this.cacheSize = cacheSize; - } - - public Boolean getIgnoreInvalidEndpoint() { - return ignoreInvalidEndpoint; - } - - public void setIgnoreInvalidEndpoint(Boolean ignoreInvalidEndpoint) { - this.ignoreInvalidEndpoint = ignoreInvalidEndpoint; - } - - -} http://git-wip-us.apache.org/repos/asf/camel/blob/d2ea9b51/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java index 5d6bc36..17ccb0e 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java @@ -616,7 +616,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> */ @SuppressWarnings("unchecked") public Type toD(String uri) { - DynamicToDefinition answer = new DynamicToDefinition(); + ToDynamicDefinition answer = new ToDynamicDefinition(); answer.setUri(uri); addOutput(answer); return (Type) this; @@ -630,7 +630,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> */ @SuppressWarnings("unchecked") public Type toD(String uri, boolean ignoreInvalidEndpoint) { - DynamicToDefinition answer = new DynamicToDefinition(); + ToDynamicDefinition answer = new ToDynamicDefinition(); answer.setUri(uri); answer.setIgnoreInvalidEndpoint(ignoreInvalidEndpoint); addOutput(answer); http://git-wip-us.apache.org/repos/asf/camel/blob/d2ea9b51/camel-core/src/main/java/org/apache/camel/model/ToDynamicDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/ToDynamicDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ToDynamicDefinition.java new file mode 100644 index 0000000..43db47b --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/model/ToDynamicDefinition.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.model; + +import java.util.ArrayList; +import java.util.List; +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlAttribute; +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.camel.ExchangePattern; +import org.apache.camel.Expression; +import org.apache.camel.NoSuchLanguageException; +import org.apache.camel.Processor; +import org.apache.camel.builder.ExpressionBuilder; +import org.apache.camel.processor.SendDynamicProcessor; +import org.apache.camel.spi.Language; +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.RouteContext; +import org.apache.camel.util.ObjectHelper; + +/** + * Sends the message to a dynamic endpoint (uri supports languages) + * <p/> + * You can specify multiple languages in the uri separated by the plus sign, such as <tt>mock:+xpath:/order/@uri</tt> + * where <tt>mock:</tt> would be a prefix to a xpath expression. + * <p/> + * For more dynamic behavior use <a href="http://camel.apache.org/recipient-list.html">Recipient List</a> or + * <a href="http://camel.apache.org/dynamic-router.html">Dynamic Router</a> EIP instead. + */ +@Metadata(label = "eip,endpoint,routing") +@XmlRootElement(name = "toD") +@XmlAccessorType(XmlAccessType.FIELD) +public class ToDynamicDefinition extends NoOutputDefinition<ToDynamicDefinition> { + @XmlAttribute @Metadata(required = "true") + private String uri; + @XmlAttribute + private ExchangePattern pattern; + @XmlAttribute + private Integer cacheSize; + @XmlAttribute + private Boolean ignoreInvalidEndpoint; + + public ToDynamicDefinition() { + } + + public ToDynamicDefinition(String uri) { + this.uri = uri; + } + + @Override + public Processor createProcessor(RouteContext routeContext) throws Exception { + ObjectHelper.notEmpty(uri, "uri", this); + + List<Expression> list = new ArrayList<Expression>(); + String[] parts = uri.split("\\+"); + for (String part : parts) { + // the part may have optional language to use, so you can mix languages + String before = ObjectHelper.before(part, ":"); + String after = ObjectHelper.after(part, ":"); + if (before != null && after != null) { + // maybe its a language + try { + Language partLanguage = routeContext.getCamelContext().resolveLanguage(before); + Expression exp = partLanguage.createExpression(after); + list.add(exp); + continue; + } catch (NoSuchLanguageException e) { + // ignore + } + } + // fallback and use simple language + Language lan = routeContext.getCamelContext().resolveLanguage("simple"); + Expression exp = lan.createExpression(part); + list.add(exp); + } + + Expression exp; + if (list.size() == 1) { + exp = list.get(0); + } else { + exp = ExpressionBuilder.concatExpression(list); + } + + SendDynamicProcessor processor = new SendDynamicProcessor(uri, exp); + processor.setPattern(pattern); + if (cacheSize != null) { + processor.setCacheSize(cacheSize); + } + if (ignoreInvalidEndpoint != null) { + processor.setIgnoreInvalidEndpoint(ignoreInvalidEndpoint); + } + return processor; + } + + @Override + public String toString() { + return "DynamicTo[" + getLabel() + "]"; + } + + // Fluent API + // ------------------------------------------------------------------------- + + /** + * Sets the optional {@link ExchangePattern} used to invoke this endpoint + */ + public ToDynamicDefinition pattern(ExchangePattern pattern) { + setPattern(pattern); + return this; + } + + /** + * Sets the maximum size used by the {@link org.apache.camel.impl.ConsumerCache} which is used to cache and reuse producers. + * + * @param cacheSize the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off. + * @return the builder + */ + public ToDynamicDefinition cacheSize(int cacheSize) { + setCacheSize(cacheSize); + return this; + } + + /** + * Ignore the invalidate endpoint exception when try to create a producer with that endpoint + * + * @return the builder + */ + public ToDynamicDefinition ignoreInvalidEndpoint() { + setIgnoreInvalidEndpoint(true); + return this; + } + + // Properties + // ------------------------------------------------------------------------- + + public String getUri() { + return uri; + } + + /** + * The uri of the endpoint to send to. The uri can be dynamic computed using the {@link org.apache.camel.language.simple.SimpleLanguage} expression. + */ + public void setUri(String uri) { + this.uri = uri; + } + + public ExchangePattern getPattern() { + return pattern; + } + + public void setPattern(ExchangePattern pattern) { + this.pattern = pattern; + } + + public Integer getCacheSize() { + return cacheSize; + } + + public void setCacheSize(Integer cacheSize) { + this.cacheSize = cacheSize; + } + + public Boolean getIgnoreInvalidEndpoint() { + return ignoreInvalidEndpoint; + } + + public void setIgnoreInvalidEndpoint(Boolean ignoreInvalidEndpoint) { + this.ignoreInvalidEndpoint = ignoreInvalidEndpoint; + } + + +} http://git-wip-us.apache.org/repos/asf/camel/blob/d2ea9b51/camel-core/src/main/java/org/apache/camel/processor/DynamicSendProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/DynamicSendProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/DynamicSendProcessor.java deleted file mode 100644 index a8d5994..0000000 --- a/camel-core/src/main/java/org/apache/camel/processor/DynamicSendProcessor.java +++ /dev/null @@ -1,214 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.processor; - -import org.apache.camel.AsyncCallback; -import org.apache.camel.AsyncProcessor; -import org.apache.camel.AsyncProducerCallback; -import org.apache.camel.CamelContext; -import org.apache.camel.CamelContextAware; -import org.apache.camel.Endpoint; -import org.apache.camel.Exchange; -import org.apache.camel.ExchangePattern; -import org.apache.camel.Expression; -import org.apache.camel.NoTypeConversionAvailableException; -import org.apache.camel.Producer; -import org.apache.camel.impl.EmptyProducerCache; -import org.apache.camel.impl.ProducerCache; -import org.apache.camel.spi.IdAware; -import org.apache.camel.support.ServiceSupport; -import org.apache.camel.util.AsyncProcessorHelper; -import org.apache.camel.util.EndpointHelper; -import org.apache.camel.util.ExchangeHelper; -import org.apache.camel.util.ServiceHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Processor for forwarding exchanges to a dynamic endpoint destination. - * - * @see org.apache.camel.processor.SendProcessor - */ -public class DynamicSendProcessor extends ServiceSupport implements AsyncProcessor, IdAware, CamelContextAware { - protected static final Logger LOG = LoggerFactory.getLogger(DynamicSendProcessor.class); - protected CamelContext camelContext; - protected final String uri; - protected final Expression expression; - protected ExchangePattern pattern; - protected ProducerCache producerCache; - protected String id; - protected boolean ignoreInvalidEndpoint; - protected int cacheSize; - - public DynamicSendProcessor(String uri, Expression expression) { - this.uri = uri; - this.expression = expression; - } - - @Override - public String toString() { - return "sendTo(" + getExpression() + ")"; - } - - public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; - } - - public void process(final Exchange exchange) throws Exception { - AsyncProcessorHelper.process(this, exchange); - } - - public boolean process(Exchange exchange, final AsyncCallback callback) { - if (!isStarted()) { - exchange.setException(new IllegalStateException("SendProcessor has not been started: " + this)); - callback.done(true); - return true; - } - - // we should preserve existing MEP so remember old MEP - // if you want to permanently to change the MEP then use .setExchangePattern in the DSL - final ExchangePattern existingPattern = exchange.getPattern(); - - // which endpoint to send to - final Endpoint endpoint; - final ExchangePattern destinationExchangePattern; - - // use dynamic endpoint so calculate the endpoint to use - Object recipient = null; - try { - recipient = expression.evaluate(exchange, Object.class); - endpoint = resolveEndpoint(exchange, recipient); - destinationExchangePattern = EndpointHelper.resolveExchangePatternFromUrl(endpoint.getEndpointUri()); - } catch (Throwable e) { - if (isIgnoreInvalidEndpoint()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Endpoint uri is invalid: " + recipient + ". This exception will be ignored.", e); - } - } else { - exchange.setException(e); - } - callback.done(true); - return true; - } - - // send the exchange to the destination using the producer cache - return producerCache.doInAsyncProducer(endpoint, exchange, pattern, callback, new AsyncProducerCallback() { - public boolean doInAsyncProducer(Producer producer, AsyncProcessor asyncProducer, final Exchange exchange, - ExchangePattern pattern, final AsyncCallback callback) { - final Exchange target = configureExchange(exchange, pattern, destinationExchangePattern, endpoint); - LOG.debug(">>>> {} {}", endpoint, exchange); - return asyncProducer.process(target, new AsyncCallback() { - public void done(boolean doneSync) { - // restore previous MEP - target.setPattern(existingPattern); - // signal we are done - callback.done(doneSync); - } - }); - } - }); - } - - protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException { - // trim strings as end users might have added spaces between separators - if (recipient instanceof String) { - recipient = ((String) recipient).trim(); - } else if (recipient instanceof Endpoint) { - return (Endpoint) recipient; - } else { - // convert to a string type we can work with - recipient = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient); - } - - return ExchangeHelper.resolveEndpoint(exchange, recipient); - } - - protected Exchange configureExchange(Exchange exchange, ExchangePattern pattern, ExchangePattern destinationExchangePattern, Endpoint endpoint) { - // destination exchange pattern overrides pattern - if (destinationExchangePattern != null) { - exchange.setPattern(destinationExchangePattern); - } else if (pattern != null) { - exchange.setPattern(pattern); - } - // set property which endpoint we send to - exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri()); - return exchange; - } - - protected void doStart() throws Exception { - if (producerCache == null) { - if (cacheSize < 0) { - producerCache = new EmptyProducerCache(this, camelContext); - LOG.debug("DynamicSendTo {} is not using ProducerCache", this); - } else if (cacheSize == 0) { - producerCache = new ProducerCache(this, camelContext); - LOG.debug("DynamicSendTo {} using ProducerCache with default cache size", this); - } else { - producerCache = new ProducerCache(this, camelContext, cacheSize); - LOG.debug("DynamicSendTo {} using ProducerCache with cacheSize={}", this, cacheSize); - } - } - } - - protected void doStop() throws Exception { - ServiceHelper.stopServices(producerCache); - } - - public CamelContext getCamelContext() { - return camelContext; - } - - public void setCamelContext(CamelContext camelContext) { - this.camelContext = camelContext; - } - - public String getUri() { - return uri; - } - - public Expression getExpression() { - return expression; - } - - public ExchangePattern getPattern() { - return pattern; - } - - public void setPattern(ExchangePattern pattern) { - this.pattern = pattern; - } - - public boolean isIgnoreInvalidEndpoint() { - return ignoreInvalidEndpoint; - } - - public void setIgnoreInvalidEndpoint(boolean ignoreInvalidEndpoint) { - this.ignoreInvalidEndpoint = ignoreInvalidEndpoint; - } - - public int getCacheSize() { - return cacheSize; - } - - public void setCacheSize(int cacheSize) { - this.cacheSize = cacheSize; - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/d2ea9b51/camel-core/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java new file mode 100644 index 0000000..b207ba5 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProcessor; +import org.apache.camel.AsyncProducerCallback; +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Expression; +import org.apache.camel.NoTypeConversionAvailableException; +import org.apache.camel.Producer; +import org.apache.camel.impl.EmptyProducerCache; +import org.apache.camel.impl.ProducerCache; +import org.apache.camel.spi.IdAware; +import org.apache.camel.support.ServiceSupport; +import org.apache.camel.util.AsyncProcessorHelper; +import org.apache.camel.util.EndpointHelper; +import org.apache.camel.util.ExchangeHelper; +import org.apache.camel.util.ServiceHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Processor for forwarding exchanges to a dynamic endpoint destination. + * + * @see org.apache.camel.processor.SendProcessor + */ +public class SendDynamicProcessor extends ServiceSupport implements AsyncProcessor, IdAware, CamelContextAware { + protected static final Logger LOG = LoggerFactory.getLogger(SendDynamicProcessor.class); + protected CamelContext camelContext; + protected final String uri; + protected final Expression expression; + protected ExchangePattern pattern; + protected ProducerCache producerCache; + protected String id; + protected boolean ignoreInvalidEndpoint; + protected int cacheSize; + + public SendDynamicProcessor(String uri, Expression expression) { + this.uri = uri; + this.expression = expression; + } + + @Override + public String toString() { + return "sendTo(" + getExpression() + ")"; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public void process(final Exchange exchange) throws Exception { + AsyncProcessorHelper.process(this, exchange); + } + + public boolean process(Exchange exchange, final AsyncCallback callback) { + if (!isStarted()) { + exchange.setException(new IllegalStateException("SendProcessor has not been started: " + this)); + callback.done(true); + return true; + } + + // we should preserve existing MEP so remember old MEP + // if you want to permanently to change the MEP then use .setExchangePattern in the DSL + final ExchangePattern existingPattern = exchange.getPattern(); + + // which endpoint to send to + final Endpoint endpoint; + final ExchangePattern destinationExchangePattern; + + // use dynamic endpoint so calculate the endpoint to use + Object recipient = null; + try { + recipient = expression.evaluate(exchange, Object.class); + endpoint = resolveEndpoint(exchange, recipient); + destinationExchangePattern = EndpointHelper.resolveExchangePatternFromUrl(endpoint.getEndpointUri()); + } catch (Throwable e) { + if (isIgnoreInvalidEndpoint()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Endpoint uri is invalid: " + recipient + ". This exception will be ignored.", e); + } + } else { + exchange.setException(e); + } + callback.done(true); + return true; + } + + // send the exchange to the destination using the producer cache + return producerCache.doInAsyncProducer(endpoint, exchange, pattern, callback, new AsyncProducerCallback() { + public boolean doInAsyncProducer(Producer producer, AsyncProcessor asyncProducer, final Exchange exchange, + ExchangePattern pattern, final AsyncCallback callback) { + final Exchange target = configureExchange(exchange, pattern, destinationExchangePattern, endpoint); + LOG.debug(">>>> {} {}", endpoint, exchange); + return asyncProducer.process(target, new AsyncCallback() { + public void done(boolean doneSync) { + // restore previous MEP + target.setPattern(existingPattern); + // signal we are done + callback.done(doneSync); + } + }); + } + }); + } + + protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException { + // trim strings as end users might have added spaces between separators + if (recipient instanceof String) { + recipient = ((String) recipient).trim(); + } else if (recipient instanceof Endpoint) { + return (Endpoint) recipient; + } else { + // convert to a string type we can work with + recipient = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient); + } + + return ExchangeHelper.resolveEndpoint(exchange, recipient); + } + + protected Exchange configureExchange(Exchange exchange, ExchangePattern pattern, ExchangePattern destinationExchangePattern, Endpoint endpoint) { + // destination exchange pattern overrides pattern + if (destinationExchangePattern != null) { + exchange.setPattern(destinationExchangePattern); + } else if (pattern != null) { + exchange.setPattern(pattern); + } + // set property which endpoint we send to + exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri()); + return exchange; + } + + protected void doStart() throws Exception { + if (producerCache == null) { + if (cacheSize < 0) { + producerCache = new EmptyProducerCache(this, camelContext); + LOG.debug("DynamicSendTo {} is not using ProducerCache", this); + } else if (cacheSize == 0) { + producerCache = new ProducerCache(this, camelContext); + LOG.debug("DynamicSendTo {} using ProducerCache with default cache size", this); + } else { + producerCache = new ProducerCache(this, camelContext, cacheSize); + LOG.debug("DynamicSendTo {} using ProducerCache with cacheSize={}", this, cacheSize); + } + } + } + + protected void doStop() throws Exception { + ServiceHelper.stopServices(producerCache); + } + + public CamelContext getCamelContext() { + return camelContext; + } + + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + public String getUri() { + return uri; + } + + public Expression getExpression() { + return expression; + } + + public ExchangePattern getPattern() { + return pattern; + } + + public void setPattern(ExchangePattern pattern) { + this.pattern = pattern; + } + + public boolean isIgnoreInvalidEndpoint() { + return ignoreInvalidEndpoint; + } + + public void setIgnoreInvalidEndpoint(boolean ignoreInvalidEndpoint) { + this.ignoreInvalidEndpoint = ignoreInvalidEndpoint; + } + + public int getCacheSize() { + return cacheSize; + } + + public void setCacheSize(int cacheSize) { + this.cacheSize = cacheSize; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/d2ea9b51/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java index 0304af9..a8637ae 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java @@ -48,7 +48,7 @@ import org.slf4j.LoggerFactory; /** * Processor for forwarding exchanges to a static endpoint destination. * - * @see org.apache.camel.processor.DynamicSendProcessor + * @see SendDynamicProcessor */ public class SendProcessor extends ServiceSupport implements AsyncProcessor, Traceable, EndpointAware, IdAware { protected static final Logger LOG = LoggerFactory.getLogger(SendProcessor.class); http://git-wip-us.apache.org/repos/asf/camel/blob/d2ea9b51/camel-core/src/main/resources/org/apache/camel/model/jaxb.index ---------------------------------------------------------------------- diff --git a/camel-core/src/main/resources/org/apache/camel/model/jaxb.index b/camel-core/src/main/resources/org/apache/camel/model/jaxb.index index 7f54628..0fb9ac1 100644 --- a/camel-core/src/main/resources/org/apache/camel/model/jaxb.index +++ b/camel-core/src/main/resources/org/apache/camel/model/jaxb.index @@ -25,7 +25,6 @@ DataFormatDefinition DelayDefinition DescriptionDefinition DynamicRouterDefinition -DynamicToDefinition EnrichDefinition ExpressionSubElementDefinition FilterDefinition @@ -86,6 +85,7 @@ ThreadsDefinition ThrottleDefinition ThrowExceptionDefinition ToDefinition +ToDynamicDefinition TransactedDefinition TransformDefinition TryDefinition http://git-wip-us.apache.org/repos/asf/camel/blob/d2ea9b51/camel-core/src/test/java/org/apache/camel/management/ManagedDynamicSendProcessorTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedDynamicSendProcessorTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedDynamicSendProcessorTest.java deleted file mode 100644 index 6e17a46..0000000 --- a/camel-core/src/test/java/org/apache/camel/management/ManagedDynamicSendProcessorTest.java +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.management; - -import javax.management.MBeanServer; -import javax.management.ObjectName; -import javax.management.openmbean.TabularData; - -import org.apache.camel.ServiceStatus; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.mock.MockEndpoint; - -/** - * @version - */ -public class ManagedDynamicSendProcessorTest extends ManagementTestSupport { - - public void testManageDynamicSendProcessor() throws Exception { - // JMX tests dont work well on AIX CI servers (hangs them) - if (isPlatform("aix")) { - return; - } - - MockEndpoint foo = getMockEndpoint("mock:foo"); - foo.expectedMessageCount(1); - - template.sendBodyAndHeader("direct:start", "Hello World", "whereto", "foo"); - - assertMockEndpointsSatisfied(); - - // get the stats for the route - MBeanServer mbeanServer = getMBeanServer(); - - // get the object name for the delayer - ObjectName on = ObjectName.getInstance("org.apache.camel:context=camel-1,type=processors,name=\"mysend\""); - - // should be on route1 - String routeId = (String) mbeanServer.getAttribute(on, "RouteId"); - assertEquals("route1", routeId); - - String camelId = (String) mbeanServer.getAttribute(on, "CamelId"); - assertEquals("camel-1", camelId); - - String state = (String) mbeanServer.getAttribute(on, "State"); - assertEquals(ServiceStatus.Started.name(), state); - - String uri = (String) mbeanServer.getAttribute(on, "Uri"); - assertEquals("direct:${header.whereto}", uri); - - String pattern = (String) mbeanServer.getAttribute(on, "MessageExchangePattern"); - assertNull(pattern); - - TabularData data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{false}, new String[]{"boolean"}); - assertNotNull(data); - assertEquals(2, data.size()); - - data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{true}, new String[]{"boolean"}); - assertNotNull(data); - assertEquals(6, data.size()); - - String json = (String) mbeanServer.invoke(on, "informationJson", null, null); - assertNotNull(json); - assertTrue(json.contains("\"description\": \"Sends the message to a dynamic endpoint (uri supports languages)")); - assertTrue(json.contains(" \"uri\": { \"kind\": \"attribute\", \"required\": \"true\", \"type\": \"string\", \"javaType\": \"java.lang.String\"," - + " \"deprecated\": \"false\", \"value\": \"direct:${header.whereto}\"")); - } - - @Override - protected RouteBuilder createRouteBuilder() throws Exception { - return new RouteBuilder() { - @Override - public void configure() throws Exception { - from("direct:start") - .toD("direct:${header.whereto}").id("mysend"); - - from("direct:foo").to("mock:foo"); - } - }; - } - -} http://git-wip-us.apache.org/repos/asf/camel/blob/d2ea9b51/camel-core/src/test/java/org/apache/camel/management/ManagedSendDynamicProcessorTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedSendDynamicProcessorTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedSendDynamicProcessorTest.java new file mode 100644 index 0000000..38c67cb --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedSendDynamicProcessorTest.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management; + +import javax.management.MBeanServer; +import javax.management.ObjectName; +import javax.management.openmbean.TabularData; + +import org.apache.camel.ServiceStatus; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; + +/** + * @version + */ +public class ManagedSendDynamicProcessorTest extends ManagementTestSupport { + + public void testManageSendDynamicProcessor() throws Exception { + // JMX tests dont work well on AIX CI servers (hangs them) + if (isPlatform("aix")) { + return; + } + + MockEndpoint foo = getMockEndpoint("mock:foo"); + foo.expectedMessageCount(1); + + template.sendBodyAndHeader("direct:start", "Hello World", "whereto", "foo"); + + assertMockEndpointsSatisfied(); + + // get the stats for the route + MBeanServer mbeanServer = getMBeanServer(); + + // get the object name for the delayer + ObjectName on = ObjectName.getInstance("org.apache.camel:context=camel-1,type=processors,name=\"mysend\""); + + // should be on route1 + String routeId = (String) mbeanServer.getAttribute(on, "RouteId"); + assertEquals("route1", routeId); + + String camelId = (String) mbeanServer.getAttribute(on, "CamelId"); + assertEquals("camel-1", camelId); + + String state = (String) mbeanServer.getAttribute(on, "State"); + assertEquals(ServiceStatus.Started.name(), state); + + String uri = (String) mbeanServer.getAttribute(on, "Uri"); + assertEquals("direct:${header.whereto}", uri); + + String pattern = (String) mbeanServer.getAttribute(on, "MessageExchangePattern"); + assertNull(pattern); + + TabularData data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{false}, new String[]{"boolean"}); + assertNotNull(data); + assertEquals(2, data.size()); + + data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{true}, new String[]{"boolean"}); + assertNotNull(data); + assertEquals(6, data.size()); + + String json = (String) mbeanServer.invoke(on, "informationJson", null, null); + assertNotNull(json); + assertTrue(json.contains("\"description\": \"Sends the message to a dynamic endpoint (uri supports languages)")); + assertTrue(json.contains(" \"uri\": { \"kind\": \"attribute\", \"required\": \"true\", \"type\": \"string\", \"javaType\": \"java.lang.String\"," + + " \"deprecated\": \"false\", \"value\": \"direct:${header.whereto}\"")); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .toD("direct:${header.whereto}").id("mysend"); + + from("direct:foo").to("mock:foo"); + } + }; + } + +}