This is an automated email from the ASF dual-hosted git repository. lburgazzoli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git
commit 979288ecc735e8613a39ca5d627d2f288fb68c2f Author: Luca Burgazzoli <lburgazz...@gmail.com> AuthorDate: Mon Sep 28 20:59:58 2020 +0200 kamelet source/sink component #490 --- .../src/main/resources/application.properties | 2 +- .../src/test/resources/routes/set-body.yaml | 2 +- .../src/test/resources/routes/to-upper.yaml | 2 +- .../apache/camel/component/kamelet/kamelet.json | 2 +- .../apache/camel/component/kamelet/Kamelet.java | 97 ++++++++++++- .../camel/component/kamelet/KameletComponent.java | 154 +++++++-------------- .../camel/component/kamelet/KameletEndpoint.java | 7 +- .../camel/component/kamelet/KameletProducer.java | 2 +- .../src/test/resources/log4j2-test.xml | 2 + 9 files changed, 151 insertions(+), 119 deletions(-) diff --git a/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/main/resources/application.properties b/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/main/resources/application.properties index 3ce5493..5b8f41a 100644 --- a/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/main/resources/application.properties +++ b/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/main/resources/application.properties @@ -18,6 +18,6 @@ # # Quarkus # -quarkus.log.console.enable = false +quarkus.log.console.enable = true quarkus.banner.enabled = false diff --git a/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/test/resources/routes/set-body.yaml b/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/test/resources/routes/set-body.yaml index 55347e9..c311da1 100644 --- a/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/test/resources/routes/set-body.yaml +++ b/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/test/resources/routes/set-body.yaml @@ -16,7 +16,7 @@ # - from: - uri: "direct:{{routeId}}" + uri: "kamelet:source" steps: - set-body: constant: "{{bodyValue}}" \ No newline at end of file diff --git a/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/test/resources/routes/to-upper.yaml b/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/test/resources/routes/to-upper.yaml index 74105f1..ba51838 100644 --- a/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/test/resources/routes/to-upper.yaml +++ b/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/test/resources/routes/to-upper.yaml @@ -16,7 +16,7 @@ # - from: - uri: "direct:{{routeId}}" + uri: "kamelet:source" steps: - set-body: constant: "{{message}}" diff --git a/components/camel-kamelet/src/generated/resources/org/apache/camel/component/kamelet/kamelet.json b/components/camel-kamelet/src/generated/resources/org/apache/camel/component/kamelet/kamelet.json index a66f1ca..4dd132f 100644 --- a/components/camel-kamelet/src/generated/resources/org/apache/camel/component/kamelet/kamelet.json +++ b/components/camel-kamelet/src/generated/resources/org/apache/camel/component/kamelet/kamelet.json @@ -6,7 +6,7 @@ "description": "The Kamelet Component provides support for interacting with Knative", "deprecated": false, "firstVersion": "3.5.0", - "label": "camel-k", + "label": "core", "javaType": "org.apache.camel.component.kamelet.KameletComponent", "supportLevel": "Preview", "groupId": "org.apache.camel.k", diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java index 689c1ed..14ecc43 100644 --- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java @@ -17,19 +17,32 @@ package org.apache.camel.component.kamelet; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.Properties; +import java.util.StringJoiner; import java.util.function.Predicate; import org.apache.camel.CamelContext; +import org.apache.camel.model.FromDefinition; +import org.apache.camel.model.ModelCamelContext; +import org.apache.camel.model.RouteDefinition; +import org.apache.camel.model.RouteTemplateDefinition; +import org.apache.camel.model.RouteTemplateParameterDefinition; +import org.apache.camel.model.ToDefinition; import org.apache.camel.spi.PropertiesComponent; +import org.apache.camel.support.CamelContextHelper; import org.apache.camel.util.StringHelper; +import static org.apache.camel.model.ProcessorDefinitionHelper.filterTypeInOutputs; + public final class Kamelet { public static final String PROPERTIES_PREFIX = "camel.kamelet."; public static final String SCHEME = "kamelet"; public static final String SOURCE_ID = "source"; public static final String SINK_ID = "sink"; + public static final String PARAM_ROUTE_ID = "routeId"; + public static final String PARAM_TEMPLATE_ID = "templateId"; private Kamelet() { } @@ -38,9 +51,14 @@ public final class Kamelet { return item -> item.startsWith(prefix); } - public static String extractTemplateId(CamelContext context, String remaining) { + public static String extractTemplateId(CamelContext context, String remaining, Map<String, Object> parameters) { + Object param = parameters.get(PARAM_TEMPLATE_ID); + if (param != null) { + return CamelContextHelper.mandatoryConvertTo(context, String.class, param); + } + if (SOURCE_ID.equals(remaining) || SINK_ID.equals(remaining)) { - return context.resolvePropertyPlaceholders("{{templateId}}"); + return context.resolvePropertyPlaceholders("{{" + PARAM_TEMPLATE_ID + "}}"); } String answer = StringHelper.before(remaining, "/"); @@ -51,14 +69,19 @@ public final class Kamelet { return answer; } - public static String extractRouteId(CamelContext context, String remaining) { + public static String extractRouteId(CamelContext context, String remaining, Map<String, Object> parameters) { + Object param = parameters.get(PARAM_ROUTE_ID); + if (param != null) { + return CamelContextHelper.mandatoryConvertTo(context, String.class, param); + } + if (SOURCE_ID.equals(remaining) || SINK_ID.equals(remaining)) { - return context.resolvePropertyPlaceholders("{{routeId}}"); + return context.resolvePropertyPlaceholders("{{" + PARAM_ROUTE_ID + "}}"); } String answer = StringHelper.after(remaining, "/"); if (answer == null) { - answer = extractTemplateId(context, remaining) + "-" + context.getUuidGenerator().generateUuid(); + answer = extractTemplateId(context, remaining, parameters) + "-" + context.getUuidGenerator().generateUuid(); } return answer; @@ -84,4 +107,68 @@ public final class Kamelet { return properties; } + + public static String addRouteFromTemplate(ModelCamelContext context, String routeId, String routeTemplateId, Map<String, Object> parameters) throws Exception { + RouteTemplateDefinition target = null; + for (RouteTemplateDefinition def : context.getRouteTemplateDefinitions()) { + if (routeTemplateId.equals(def.getId())) { + target = def; + break; + } + } + if (target == null) { + throw new IllegalArgumentException("Cannot find RouteTemplate with id " + routeTemplateId); + } + + StringJoiner templatesBuilder = new StringJoiner(", "); + final Map<String, Object> prop = new HashMap<>(); + // include default values first from the template (and validate that we have inputs for all required parameters) + if (target.getTemplateParameters() != null) { + for (RouteTemplateParameterDefinition temp : target.getTemplateParameters()) { + if (temp.getDefaultValue() != null) { + prop.put(temp.getName(), temp.getDefaultValue()); + } else { + // this is a required parameter do we have that as input + if (!parameters.containsKey(temp.getName())) { + templatesBuilder.add(temp.getName()); + } + } + } + } + if (templatesBuilder.length() > 0) { + throw new IllegalArgumentException( + "Route template " + routeTemplateId + " the following mandatory parameters must be provided: " + + templatesBuilder.toString()); + } + // then override with user parameters + if (parameters != null) { + prop.putAll(parameters); + } + + RouteDefinition def = target.asRouteDefinition(); + // must make deep copy of input + def.setInput(null); + def.setInput(new FromDefinition(target.getRoute().getInput().getEndpointUri())); + if (routeId != null) { + def.setId(routeId); + } + // must make the source and sink endpoints are unique by appending the route id before we create the route from the template + if (def.getInput().getEndpointUri().startsWith("kamelet:source") || def.getInput().getEndpointUri().startsWith("kamelet//source")) { + def.getInput().setUri("kamelet:source?" + PARAM_ROUTE_ID + "=" + routeId); + } + Iterator<ToDefinition> it = filterTypeInOutputs(def.getOutputs(), ToDefinition.class); + while (it.hasNext()) { + ToDefinition to = it.next(); + if (to.getEndpointUri().startsWith("kamelet:sink") || to.getEndpointUri().startsWith("kamelet://sink")) { + to.setUri("kamelet:sink?" + PARAM_ROUTE_ID + "=" + routeId); + } + } + + def.setTemplateParameters(prop); + context.removeRouteDefinition(def); + context.getRouteDefinitions().add(def); + + return def.getId(); + } + } diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java index 1cc92b2..9696c90 100644 --- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java @@ -17,12 +17,9 @@ package org.apache.camel.component.kamelet; import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.StringJoiner; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -30,14 +27,8 @@ import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; import org.apache.camel.RuntimeCamelException; import org.apache.camel.VetoCamelContextStartException; -import org.apache.camel.model.EndpointRequiredDefinition; -import org.apache.camel.model.FromDefinition; import org.apache.camel.model.ModelCamelContext; -import org.apache.camel.model.ProcessorDefinitionHelper; import org.apache.camel.model.RouteDefinition; -import org.apache.camel.model.RouteTemplateDefinition; -import org.apache.camel.model.RouteTemplateParameterDefinition; -import org.apache.camel.model.ToDefinition; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.PropertiesComponent; import org.apache.camel.spi.annotations.Component; @@ -47,8 +38,9 @@ import org.apache.camel.support.service.ServiceHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import static org.apache.camel.model.ProcessorDefinitionHelper.filterTypeInOutputs; +import static org.apache.camel.component.kamelet.Kamelet.PARAM_ROUTE_ID; +import static org.apache.camel.component.kamelet.Kamelet.PARAM_TEMPLATE_ID; +import static org.apache.camel.component.kamelet.Kamelet.addRouteFromTemplate; /** * The Kamelet Component provides support for materializing routes templates. @@ -71,23 +63,53 @@ public class KameletComponent extends DefaultComponent { } @Override - public Endpoint createEndpoint(String uri) throws Exception { - return super.createEndpoint(uri); - } - - @Override protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { - final String templateId = Kamelet.extractTemplateId(getCamelContext(), remaining); - final String routeId = Kamelet.extractRouteId(getCamelContext(), remaining); - final String newUri = "kamelet:" + templateId + "/" + routeId; + final String templateId = Kamelet.extractTemplateId(getCamelContext(), remaining, parameters); + final String routeId = Kamelet.extractRouteId(getCamelContext(), remaining, parameters); + + parameters.remove(PARAM_TEMPLATE_ID); + parameters.remove(PARAM_ROUTE_ID); final KameletEndpoint endpoint; - if (!Kamelet.SOURCE_ID.equals(remaining) && !Kamelet.SINK_ID.equals(remaining)) { - endpoint = new KameletEndpoint(newUri, this, templateId, routeId, consumers) { + if (Kamelet.SOURCE_ID.equals(remaining) || Kamelet.SINK_ID.equals(remaining)) { + // + // if remaining is either `source` or `sink' then it is a virtual + // endpoint that is used inside the kamelet definition to mark it + // as in/out endpoint. + // + // The following snippet defines a template which will act as a + // consumer for this Kamelet: + // + // from("kamelet:source") + // .to("log:info") + // + // The following snippet defines a template which will act as a + // producer for this Kamelet: + // + // from("telegram:bots") + // .to("kamelet:sink") + // + // Note that at the moment, there's no enforcement around `source` + // and `sink' to be defined on the right side (producer or consumer) + // + endpoint = new KameletEndpoint(uri, this, templateId, routeId, consumers); + + // forward component properties + endpoint.setBlock(block); + endpoint.setTimeout(timeout); + + // set endpoint specific properties + setProperties(endpoint, parameters); + } else { + endpoint = new KameletEndpoint(uri, this, templateId, routeId, consumers) { @Override protected void doInit() throws Exception { super.doInit(); + // + // since this is the real kamelet, then we need to hand it + // over to the tracker. + // lifecycleHandler.track(this); } }; @@ -110,20 +132,11 @@ public class KameletComponent extends DefaultComponent { // Map<String, Object> kameletProperties = Kamelet.extractKameletProperties(getCamelContext(), templateId, routeId); kameletProperties.putAll(parameters); - kameletProperties.put("templateId", templateId); - kameletProperties.put("routeId", routeId); + kameletProperties.put(PARAM_TEMPLATE_ID, templateId); + kameletProperties.put(PARAM_ROUTE_ID, routeId); // set kamelet specific properties endpoint.setKameletProperties(kameletProperties); - } else { - endpoint = new KameletEndpoint(newUri, this, templateId, routeId, consumers); - - // forward component properties - endpoint.setBlock(block); - endpoint.setTimeout(timeout); - - // set endpoint specific properties - setProperties(endpoint, parameters); } return endpoint; @@ -193,8 +206,6 @@ public class KameletComponent extends DefaultComponent { @Override public void onContextInitialized(CamelContext context) throws VetoCamelContextStartException { if (!this.initialized.compareAndExchange(false, true)) { - ModelCamelContext mcc = context.adapt(ModelCamelContext.class); - for (KameletEndpoint endpoint : endpoints) { try { createRouteForEndpoint(endpoint); @@ -233,12 +244,14 @@ public class KameletComponent extends DefaultComponent { if (!def.isPrepared()) { // when starting the route that was created from the template - // then we must provide the route id as local properties to the properties component - // as this route id is used internal by kamelets when they are starting + // then we must provide the route id as local properties to the + // properties component as this route id is used internal by + // kamelets when they are starting PropertiesComponent pc = context.getPropertiesComponent(); try { Properties prop = new Properties(); - prop.put("routeId", id); + prop.put(PARAM_TEMPLATE_ID, endpoint.getTemplateId()); + prop.put(PARAM_ROUTE_ID, id); pc.setLocalProperties(prop); context.startRouteDefinitions(List.of(def)); } finally { @@ -248,72 +261,5 @@ public class KameletComponent extends DefaultComponent { LOGGER.debug("Route with id={} created from template={}", id, endpoint.getTemplateId()); } - - private static String addRouteFromTemplate(final ModelCamelContext context, final String routeId, final String routeTemplateId, final Map<String, Object> parameters) - throws Exception { - RouteTemplateDefinition target = null; - for (RouteTemplateDefinition def : context.getRouteTemplateDefinitions()) { - if (routeTemplateId.equals(def.getId())) { - target = def; - break; - } - } - if (target == null) { - throw new IllegalArgumentException("Cannot find RouteTemplate with id " + routeTemplateId); - } - - StringJoiner templatesBuilder = new StringJoiner(", "); - final Map<String, Object> prop = new HashMap(); - // include default values first from the template (and validate that we have inputs for all required parameters) - if (target.getTemplateParameters() != null) { - for (RouteTemplateParameterDefinition temp : target.getTemplateParameters()) { - if (temp.getDefaultValue() != null) { - prop.put(temp.getName(), temp.getDefaultValue()); - } else { - // this is a required parameter do we have that as input - if (!parameters.containsKey(temp.getName())) { - templatesBuilder.add(temp.getName()); - } - } - } - } - if (templatesBuilder.length() > 0) { - throw new IllegalArgumentException( - "Route template " + routeTemplateId + " the following mandatory parameters must be provided: " - + templatesBuilder.toString()); - } - // then override with user parameters - if (parameters != null) { - prop.putAll(parameters); - } - - RouteDefinition def = target.asRouteDefinition(); - // must make deep copy of input - def.setInput(null); - def.setInput(new FromDefinition(target.getRoute().getInput().getEndpointUri())); - if (routeId != null) { - def.setId(routeId); - } - // must make the source and simk endpoints are unique by appending the route id before we create the route from the template - if (def.getInput().getEndpointUri().startsWith("kamelet:source")) { - def.getInput().setUri("kamelet:source?routeId=" + routeId); - } - Iterator<ToDefinition> it = filterTypeInOutputs(def.getOutputs(), ToDefinition.class); - while (it.hasNext()) { - ToDefinition to = it.next(); - if (to.getEndpointUri().startsWith("kamelet:sink")) { - // TODO: must make deep copy - to.setUri("kamelet:sink?routeId=" + routeId); - } - } - - - def.setTemplateParameters(prop); - context.removeRouteDefinition(def); - context.getRouteDefinitions().add(def); - - return def.getId(); - } - } } diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java index 2d6e883..c3760f3 100644 --- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java @@ -20,6 +20,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import org.apache.camel.Category; import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; @@ -30,8 +31,6 @@ import org.apache.camel.spi.UriPath; import org.apache.camel.support.DefaultEndpoint; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.StopWatch; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; @UriEndpoint( firstVersion = "3.5.0", @@ -39,10 +38,8 @@ import org.slf4j.LoggerFactory; syntax = "kamelet:templateId/routeId", title = "Kamelet", lenientProperties = true, - label = "camel-k") + category = Category.CORE) public class KameletEndpoint extends DefaultEndpoint { - private static final Logger LOGGER = LoggerFactory.getLogger(KameletEndpoint.class); - @Metadata(required = true) @UriPath(description = "The Route Template ID") private final String templateId; diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java index 9e6d86d..10bd42c 100644 --- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java @@ -48,7 +48,7 @@ final class KameletProducer extends DefaultAsyncProducer { @Override public boolean process(Exchange exchange, AsyncCallback callback) { try { - final KameletConsumer consumer = getEndpoint().getConsumer();; + final KameletConsumer consumer = getEndpoint().getConsumer(); if (consumer != null) { return consumer.getAsyncProcessor().process(exchange, callback); diff --git a/components/camel-kamelet/src/test/resources/log4j2-test.xml b/components/camel-kamelet/src/test/resources/log4j2-test.xml index 8ce15f1..d5df1ad 100644 --- a/components/camel-kamelet/src/test/resources/log4j2-test.xml +++ b/components/camel-kamelet/src/test/resources/log4j2-test.xml @@ -32,7 +32,9 @@ <Logger name="org.apache.camel.component.kamelet" level="TRACE"/> <Root level="INFO"> + <!-- <AppenderRef ref="STDOUT"/> + --> <AppenderRef ref="FILE"/> </Root> </Loggers>