This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch kamelet-poll-enrich in repository https://gitbox.apache.org/repos/asf/camel.git
commit 4168dbc8f4cdc0bca6107723ef10f873c34fed61 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Wed Dec 13 13:55:26 2023 +0100 CAMEL-20232: camel-core - Kamelets with Enrich and PollEnrich dynamic endpoints with template parameters --- .../camel/component/kamelet/KameletEnrichTest.java | 84 ++++++++++++++++++++++ .../component/kamelet/KameletPollEnrichTest.java | 66 +++++++++++++++++ .../org/apache/camel/reifier/EnrichReifier.java | 33 +++++---- .../apache/camel/reifier/PollEnrichReifier.java | 35 +++++---- 4 files changed, 190 insertions(+), 28 deletions(-) diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletEnrichTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletEnrichTest.java new file mode 100644 index 00000000000..efcbd2e16ad --- /dev/null +++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletEnrichTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kamelet; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class KameletEnrichTest extends CamelTestSupport { + + @Test + public void testEnrich() throws Exception { + getMockEndpoint("mock:foo").expectedBodiesReceived("A"); + getMockEndpoint("mock:bar").expectedBodiesReceived("B"); + + getMockEndpoint("mock:foo").whenAnyExchangeReceived(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + String b = exchange.getMessage().getBody(String.class); + exchange.getMessage().setBody(b + b); + } + }); + + getMockEndpoint("mock:bar").whenAnyExchangeReceived(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + String b = exchange.getMessage().getBody(String.class); + exchange.getMessage().setBody("Hello " + b); + } + }); + + String out = template.requestBody("direct:foo", "A", String.class); + Assertions.assertEquals("AA", out); + + out = template.requestBody("direct:bar", "B", String.class); + Assertions.assertEquals("Hello B", out); + + MockEndpoint.assertIsSatisfied(context); + } + + // ********************************************** + // + // test set-up + // + // ********************************************** + + @Override + protected RoutesBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + routeTemplate("broker") + .templateParameter("queue") + .from("kamelet:source") + .enrich().simple("mock:{{queue}}"); + + from("direct:foo") + .kamelet("broker?queue=foo"); + + from("direct:bar") + .kamelet("broker?queue=bar"); + } + }; + } +} diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletPollEnrichTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletPollEnrichTest.java new file mode 100644 index 00000000000..33e5b846c05 --- /dev/null +++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletPollEnrichTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kamelet; + +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class KameletPollEnrichTest extends CamelTestSupport { + + @Test + public void testPollEnrich() throws Exception { + template.sendBody("seda:foo", "AA"); + template.sendBody("seda:bar", "Hello B"); + + String out = template.requestBody("direct:foo", "A", String.class); + Assertions.assertEquals("AA", out); + + out = template.requestBody("direct:bar", "B", String.class); + Assertions.assertEquals("Hello B", out); + + MockEndpoint.assertIsSatisfied(context); + } + + // ********************************************** + // + // test set-up + // + // ********************************************** + + @Override + protected RoutesBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + routeTemplate("broker") + .templateParameter("queue") + .from("kamelet:source") + .pollEnrich().simple("seda:{{queue}}").timeout(5000); + + from("direct:foo") + .kamelet("broker?queue=foo"); + + from("direct:bar") + .kamelet("broker?queue=bar"); + } + }; + } +} diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/EnrichReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/EnrichReifier.java index 337b99c5c58..2302628c569 100644 --- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/EnrichReifier.java +++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/EnrichReifier.java @@ -23,9 +23,12 @@ import org.apache.camel.Processor; import org.apache.camel.Route; import org.apache.camel.model.EnrichDefinition; import org.apache.camel.model.ProcessorDefinition; +import org.apache.camel.model.ProcessorDefinitionHelper; +import org.apache.camel.model.RouteDefinition; import org.apache.camel.model.language.ConstantExpression; import org.apache.camel.processor.Enricher; import org.apache.camel.support.DefaultExchange; +import org.apache.camel.support.EndpointHelper; public class EnrichReifier extends ExpressionReifier<EnrichDefinition> { @@ -35,25 +38,27 @@ public class EnrichReifier extends ExpressionReifier<EnrichDefinition> { @Override public Processor createProcessor() throws Exception { - boolean isShareUnitOfWork = parseBoolean(definition.getShareUnitOfWork(), false); - boolean isIgnoreInvalidEndpoint = parseBoolean(definition.getIgnoreInvalidEndpoint(), false); - boolean isAggregateOnException = parseBoolean(definition.getAggregateOnException(), false); - - Enricher enricher; + Expression exp; + String uri; if (definition.getExpression() instanceof ConstantExpression) { - Expression exp = createExpression(definition.getExpression()); + exp = createExpression(definition.getExpression()); Exchange ex = new DefaultExchange(camelContext); - String uri = exp.evaluate(ex, String.class); - enricher = new Enricher(exp, uri); + uri = exp.evaluate(ex, String.class); } else { - Expression exp = createExpression(definition.getExpression()); - String uri = definition.getExpression().getExpression(); - enricher = new Enricher(exp, uri); + exp = createExpression(definition.getExpression()); + uri = definition.getExpression().getExpression(); + } + + // route templates should pre parse uri as they have dynamic values as part of their template parameters + RouteDefinition rd = ProcessorDefinitionHelper.getRoute(definition); + if (rd != null && rd.isTemplate() != null && rd.isTemplate()) { + uri = EndpointHelper.resolveEndpointUriPropertyPlaceholders(camelContext, uri); } - enricher.setShareUnitOfWork(isShareUnitOfWork); - enricher.setIgnoreInvalidEndpoint(isIgnoreInvalidEndpoint); - enricher.setAggregateOnException(isAggregateOnException); + Enricher enricher = new Enricher(exp, uri); + enricher.setShareUnitOfWork(parseBoolean(definition.getShareUnitOfWork(), false)); + enricher.setIgnoreInvalidEndpoint(parseBoolean(definition.getIgnoreInvalidEndpoint(), false)); + enricher.setAggregateOnException(parseBoolean(definition.getAggregateOnException(), false)); Integer num = parseInt(definition.getCacheSize()); if (num != null) { enricher.setCacheSize(num); diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PollEnrichReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PollEnrichReifier.java index 36d43a27205..4ddcbcc6ed8 100644 --- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PollEnrichReifier.java +++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PollEnrichReifier.java @@ -23,9 +23,12 @@ import org.apache.camel.Processor; import org.apache.camel.Route; import org.apache.camel.model.PollEnrichDefinition; import org.apache.camel.model.ProcessorDefinition; +import org.apache.camel.model.ProcessorDefinitionHelper; +import org.apache.camel.model.RouteDefinition; import org.apache.camel.model.language.ConstantExpression; import org.apache.camel.processor.PollEnricher; import org.apache.camel.support.DefaultExchange; +import org.apache.camel.support.EndpointHelper; public class PollEnrichReifier extends ProcessorReifier<PollEnrichDefinition> { @@ -35,23 +38,27 @@ public class PollEnrichReifier extends ProcessorReifier<PollEnrichDefinition> { @Override public Processor createProcessor() throws Exception { - // if no timeout then we should block, and there use a negative timeout - long time = parseDuration(definition.getTimeout(), -1); - boolean isIgnoreInvalidEndpoint = parseBoolean(definition.getIgnoreInvalidEndpoint(), false); - boolean isAggregateOnException = parseBoolean(definition.getAggregateOnException(), false); - - PollEnricher enricher; + Expression exp; + String uri; if (definition.getExpression() instanceof ConstantExpression) { - Expression exp = createExpression(definition.getExpression()); + exp = createExpression(definition.getExpression()); Exchange ex = new DefaultExchange(camelContext); - String uri = exp.evaluate(ex, String.class); - enricher = new PollEnricher(uri, time); + uri = exp.evaluate(ex, String.class); } else { - Expression exp = createExpression(definition.getExpression()); - String uri = definition.getExpression().getExpression(); - enricher = new PollEnricher(exp, uri, time); + exp = createExpression(definition.getExpression()); + uri = definition.getExpression().getExpression(); + } + + // route templates should pre parse uri as they have dynamic values as part of their template parameters + RouteDefinition rd = ProcessorDefinitionHelper.getRoute(definition); + if (rd != null && rd.isTemplate() != null && rd.isTemplate()) { + uri = EndpointHelper.resolveEndpointUriPropertyPlaceholders(camelContext, uri); } + // if no timeout then we should block, and there use a negative timeout + long timeout = parseDuration(definition.getTimeout(), -1); + + PollEnricher enricher = new PollEnricher(exp, uri, timeout); AggregationStrategy strategy = getConfiguredAggregationStrategy(definition); if (strategy != null) { enricher.setAggregationStrategy(strategy); @@ -60,8 +67,8 @@ public class PollEnrichReifier extends ProcessorReifier<PollEnrichDefinition> { if (num != null) { enricher.setCacheSize(num); } - enricher.setIgnoreInvalidEndpoint(isIgnoreInvalidEndpoint); - enricher.setAggregateOnException(isAggregateOnException); + enricher.setIgnoreInvalidEndpoint(parseBoolean(definition.getIgnoreInvalidEndpoint(), false)); + enricher.setAggregateOnException(parseBoolean(definition.getAggregateOnException(), false)); if (definition.getAutoStartComponents() != null) { enricher.setAutoStartupComponents(parseBoolean(definition.getAutoStartComponents(), true)); }