This is an automated email from the ASF dual-hosted git repository. lburgazzoli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git
commit 4dede2ab1d61551b675b6bde82dae236511ac07e Author: Luca Burgazzoli <lburgazz...@gmail.com> AuthorDate: Mon Sep 28 11:56:22 2020 +0200 kamelet source/sink component #490 --- components/camel-kamelet/pom.xml | 9 +- .../apache/camel/component/kamelet/Kamelet.java | 32 ++--- .../camel/component/kamelet/KameletComponent.java | 143 ++++++++++++++----- .../camel/component/kamelet/KameletConsumer.java | 74 ++++++++++ .../camel/component/kamelet/KameletEndpoint.java | 157 ++++++++++++--------- .../camel/component/kamelet/KameletProducer.java | 70 +++++++++ .../camel/component/kamelet/KameletBasicTest.java | 4 +- .../component/kamelet/KameletPropertiesTest.java | 2 +- .../camel/component/kamelet/KameletRouteTest.java | 23 +-- .../component/kamelet/KameletValidationTest.java | 2 +- .../src/test/resources/log4j2-test.xml | 2 - 11 files changed, 361 insertions(+), 157 deletions(-) diff --git a/components/camel-kamelet/pom.xml b/components/camel-kamelet/pom.xml index 21232ac..426c1f7 100644 --- a/components/camel-kamelet/pom.xml +++ b/components/camel-kamelet/pom.xml @@ -40,10 +40,6 @@ <groupId>org.apache.camel</groupId> <artifactId>camel-core-engine</artifactId> </dependency> - <dependency> - <groupId>org.apache.camel</groupId> - <artifactId>camel-direct</artifactId> - </dependency> <!-- ****************************** --> <!-- --> @@ -78,6 +74,11 @@ </dependency> <dependency> <groupId>org.apache.camel</groupId> + <artifactId>camel-direct</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> <artifactId>camel-test-junit5</artifactId> <scope>test</scope> </dependency> diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java index d9ac81b..689c1ed 100644 --- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java @@ -17,24 +17,19 @@ package org.apache.camel.component.kamelet; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Properties; import java.util.function.Predicate; import org.apache.camel.CamelContext; -import org.apache.camel.model.ModelCamelContext; -import org.apache.camel.model.RouteDefinition; import org.apache.camel.spi.PropertiesComponent; import org.apache.camel.util.StringHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public final class Kamelet { public static final String PROPERTIES_PREFIX = "camel.kamelet."; public static final String SCHEME = "kamelet"; - - private static final Logger LOGGER = LoggerFactory.getLogger(Kamelet.class); + public static final String SOURCE_ID = "source"; + public static final String SINK_ID = "sink"; private Kamelet() { } @@ -43,20 +38,11 @@ public final class Kamelet { return item -> item.startsWith(prefix); } - public static void createRouteForEndpoint(KameletEndpoint endpoint) throws Exception { - LOGGER.debug("Creating route from template {}", endpoint.getTemplateId()); - - ModelCamelContext context = endpoint.getCamelContext().adapt(ModelCamelContext.class); - String id = context.addRouteFromTemplate(endpoint.getRouteId(), endpoint.getTemplateId(), endpoint.getKameletProperties()); - RouteDefinition def = context.getRouteDefinition(id); - if (!def.isPrepared()) { - context.startRouteDefinitions(List.of(def)); + public static String extractTemplateId(CamelContext context, String remaining) { + if (SOURCE_ID.equals(remaining) || SINK_ID.equals(remaining)) { + return context.resolvePropertyPlaceholders("{{templateId}}"); } - LOGGER.debug("Route {} created from template {}", id, endpoint.getTemplateId()); - } - - public static String extractTemplateId(CamelContext context, String remaining) { String answer = StringHelper.before(remaining, "/"); if (answer == null) { answer = remaining; @@ -65,7 +51,11 @@ public final class Kamelet { return answer; } - public static String extractRouteId(CamelContext context, String remaining) { + public static String extractRouteId(CamelContext context, String remaining) { + if (SOURCE_ID.equals(remaining) || SINK_ID.equals(remaining)) { + return context.resolvePropertyPlaceholders("{{routeId}}"); + } + String answer = StringHelper.after(remaining, "/"); if (answer == null) { answer = extractTemplateId(context, remaining) + "-" + context.getUuidGenerator().generateUuid(); @@ -74,7 +64,7 @@ public final class Kamelet { return answer; } - public static Map<String, Object> extractKameletProperties(CamelContext context, String... elements) { + public static Map<String, Object> extractKameletProperties(CamelContext context, String... elements) { PropertiesComponent pc = context.getPropertiesComponent(); Map<String, Object> properties = new HashMap<>(); String prefix = Kamelet.PROPERTIES_PREFIX; diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java index 7a97d9c..91a2514 100644 --- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java @@ -19,64 +19,120 @@ package org.apache.camel.component.kamelet; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; import org.apache.camel.RuntimeCamelException; import org.apache.camel.VetoCamelContextStartException; +import org.apache.camel.model.ModelCamelContext; +import org.apache.camel.model.RouteDefinition; +import org.apache.camel.spi.Metadata; import org.apache.camel.spi.annotations.Component; import org.apache.camel.support.DefaultComponent; import org.apache.camel.support.LifecycleStrategySupport; +import org.apache.camel.support.service.ServiceHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * The Kamelet Component provides support for interacting with <a href="https://knative.dev">Knative</a>. + * The Kamelet Component provides support for materializing routes templates. */ @Component(Kamelet.SCHEME) public class KameletComponent extends DefaultComponent { - private final LifecycleHandler lifecycleHandler; + private static final Logger LOGGER = LoggerFactory.getLogger(KameletComponent.class); - public KameletComponent() { - this(null); - } + private final Map<String, KameletConsumer> consumers; + private final LifecycleHandler lifecycleHandler; - public KameletComponent(CamelContext context) { - super(context); + @Metadata(label = "producer", defaultValue = "true") + private boolean block = true; + @Metadata(label = "producer", defaultValue = "30000") + private long timeout = 30000L; + public KameletComponent() { this.lifecycleHandler = new LifecycleHandler(); + this.consumers = new ConcurrentHashMap<>(); } @Override protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { final String templateId = Kamelet.extractTemplateId(getCamelContext(), remaining); final String routeId = Kamelet.extractRouteId(getCamelContext(), remaining); + final String newUri = "kamelet:" + templateId + "/" + routeId; + + final KameletEndpoint endpoint; - // - // The properties for the kamelets are determined by global properties - // and local endpoint parameters, - // - // Global parameters are loaded in the following order: - // - // camel.kamelet." + templateId - // camel.kamelet." + templateId + "." routeId - // - Map<String, Object> kameletProperties = Kamelet.extractKameletProperties(getCamelContext(), templateId, routeId); - kameletProperties.putAll(parameters); - kameletProperties.put("templateId", templateId); - kameletProperties.put("routeId", routeId); - - // Remaining parameter should be related to the route and to avoid the - // parameters validation to fail, we need to clear the parameters map. - parameters.clear(); - - KameletEndpoint endpoint = new KameletEndpoint(uri, this, templateId, routeId, kameletProperties); - - // No parameters are expected here. - setProperties(endpoint, parameters); + if (!Kamelet.SOURCE_ID.equals(remaining) && !Kamelet.SINK_ID.equals(remaining)) { + endpoint = new KameletEndpoint(newUri, this, templateId, routeId, consumers) { + @Override + protected void doInit() throws Exception { + super.doInit(); + lifecycleHandler.track(this); + } + }; + + // forward component properties + endpoint.setBlock(block); + endpoint.setTimeout(timeout); + + // set endpoint specific properties + setProperties(endpoint, parameters); + + // + // The properties for the kamelets are determined by global properties + // and local endpoint parameters, + // + // Global parameters are loaded in the following order: + // + // camel.kamelet." + templateId + // camel.kamelet." + templateId + "." routeId + // + Map<String, Object> kameletProperties = Kamelet.extractKameletProperties(getCamelContext(), templateId, routeId); + kameletProperties.putAll(parameters); + kameletProperties.put("templateId", templateId); + kameletProperties.put("routeId", routeId); + + // set kamelet specific properties + endpoint.setKameletProperties(kameletProperties); + } else { + endpoint = new KameletEndpoint(newUri, this, templateId, routeId, consumers); + + // forward component properties + endpoint.setBlock(block); + endpoint.setTimeout(timeout); + + // set endpoint specific properties + setProperties(endpoint, parameters); + } return endpoint; } + public boolean isBlock() { + return block; + } + + /** + * If sending a message to a kamelet endpoint which has no active consumer, then we can tell the producer to block + * and wait for the consumer to become active. + */ + public void setBlock(boolean block) { + this.block = block; + } + + public long getTimeout() { + return timeout; + } + + /** + * The timeout value to use if block is enabled. + */ + public void setTimeout(long timeout) { + this.timeout = timeout; + } + @Override protected void doInit() throws Exception { getCamelContext().addLifecycleStrategy(lifecycleHandler); @@ -91,11 +147,11 @@ public class KameletComponent extends DefaultComponent { @Override protected void doStop() throws Exception { getCamelContext().getLifecycleStrategies().remove(lifecycleHandler); - super.doStop(); - } - void onEndpointAdd(KameletEndpoint endpoint) { - lifecycleHandler.track(endpoint); + ServiceHelper.stopService(consumers.values()); + consumers.clear(); + + super.doStop(); } /* @@ -118,9 +174,11 @@ public class KameletComponent extends DefaultComponent { @Override public void onContextInitialized(CamelContext context) throws VetoCamelContextStartException { if (!this.initialized.compareAndExchange(false, true)) { + ModelCamelContext mcc = context.adapt(ModelCamelContext.class); + for (KameletEndpoint endpoint : endpoints) { try { - Kamelet.createRouteForEndpoint(endpoint); + createRouteForEndpoint(endpoint); } catch (Exception e) { throw new VetoCamelContextStartException("Failure creating route from template: " + endpoint.getTemplateId(), e, context); } @@ -137,13 +195,28 @@ public class KameletComponent extends DefaultComponent { public void track(KameletEndpoint endpoint) { if (this.initialized.get()) { try { - Kamelet.createRouteForEndpoint(endpoint); + createRouteForEndpoint(endpoint); } catch (Exception e) { throw RuntimeCamelException.wrapRuntimeException(e); } } else { + LOGGER.debug("Tracking route template={} and id={}", endpoint.getTemplateId(), endpoint.getRouteId()); this.endpoints.add(endpoint); } } + + public static void createRouteForEndpoint(KameletEndpoint endpoint) throws Exception { + LOGGER.debug("Creating route from template={} and id={}", endpoint.getTemplateId(), endpoint.getRouteId()); + + final ModelCamelContext context = endpoint.getCamelContext().adapt(ModelCamelContext.class); + final String id = context.addRouteFromTemplate(endpoint.getRouteId(), endpoint.getTemplateId(), endpoint.getKameletProperties()); + final RouteDefinition def = context.getRouteDefinition(id); + + if (!def.isPrepared()) { + context.startRouteDefinitions(List.of(def)); + } + + LOGGER.debug("Route with id={} created from template={}", id, endpoint.getTemplateId()); + } } } diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumer.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumer.java new file mode 100644 index 0000000..c99d56c --- /dev/null +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumer.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kamelet; + +import org.apache.camel.Processor; +import org.apache.camel.ShutdownRunningTask; +import org.apache.camel.Suspendable; +import org.apache.camel.spi.ShutdownAware; +import org.apache.camel.support.DefaultConsumer; + +final class KameletConsumer extends DefaultConsumer implements ShutdownAware, Suspendable { + public KameletConsumer(KameletEndpoint endpoint, Processor processor) { + super(endpoint, processor); + } + + @Override + public KameletEndpoint getEndpoint() { + return (KameletEndpoint)super.getEndpoint(); + } + + @Override + protected void doStart() throws Exception { + getEndpoint().addConsumer(this); + } + + @Override + protected void doStop() throws Exception { + getEndpoint().removeConsumer(this); + } + + @Override + protected void doSuspend() throws Exception { + getEndpoint().removeConsumer(this); + } + + @Override + protected void doResume() throws Exception { + getEndpoint().addConsumer(this); + } + + @Override + public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) { + // deny stopping on shutdown as we want kamelet consumers to run in + // case some other queues depend on this consumer to run, so it can + // complete its exchanges + return true; + } + + @Override + public int getPendingExchangesSize() { + // return 0 as we do not have an internal memory queue with a variable + // size of inflight messages. + return 0; + } + + @Override + public void prepareShutdown(boolean suspendOnly, boolean forced) { + // noop + } +} diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java index 3209c6d..80d8e10 100644 --- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java @@ -17,23 +17,21 @@ package org.apache.camel.component.kamelet; import java.util.Collections; +import java.util.HashMap; import java.util.Map; -import org.apache.camel.AsyncCallback; -import org.apache.camel.AsyncProducer; import org.apache.camel.Consumer; -import org.apache.camel.Endpoint; -import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriPath; -import org.apache.camel.support.DefaultAsyncProducer; -import org.apache.camel.support.DefaultConsumer; import org.apache.camel.support.DefaultEndpoint; -import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.StopWatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @UriEndpoint( firstVersion = "3.5.0", @@ -43,34 +41,67 @@ import org.apache.camel.util.ObjectHelper; lenientProperties = true, label = "camel-k") public class KameletEndpoint extends DefaultEndpoint { + private static final Logger LOGGER = LoggerFactory.getLogger(KameletEndpoint.class); + @Metadata(required = true) @UriPath(description = "The Route Template ID") private final String templateId; - @Metadata(required = false) @UriPath(description = "The Route ID", defaultValueNote = "The ID will be auto-generated if not provided") private final String routeId; + @UriParam(label = "producer", defaultValue = "true") + private boolean block = true; + @UriParam(label = "producer", defaultValue = "30000") + private long timeout = 30000L; + @UriParam(label = "producer", defaultValue = "true") + private final Map<String, Object> kameletProperties; - private final String kameletUri; + private final Map<String, KameletConsumer> consumers; + private final String key; public KameletEndpoint( String uri, KameletComponent component, String templateId, String routeId, - Map<String, Object> kameletProperties) { + Map<String, KameletConsumer> consumers) { super(uri, component); ObjectHelper.notNull(templateId, "template id"); ObjectHelper.notNull(routeId, "route id"); - ObjectHelper.notNull(kameletProperties, "kamelet properties"); this.templateId = templateId; this.routeId = routeId; - this.kameletProperties = Collections.unmodifiableMap(kameletProperties); - this.kameletUri = "direct:" + routeId; + this.key = templateId + "/" + routeId; + this.kameletProperties = new HashMap<>(); + this.consumers = consumers; + } + + public boolean isBlock() { + return block; + } + + /** + * If sending a message to a direct endpoint which has no active consumer, then we can tell the producer to block + * and wait for the consumer to become active. + */ + public void setBlock(boolean block) { + this.block = block; + } + + public long getTimeout() { + return timeout; + } + + /** + * The timeout value to use if block is enabled. + * + * @param timeout the timeout value + */ + public void setTimeout(long timeout) { + this.timeout = timeout; } @Override @@ -83,6 +114,11 @@ public class KameletEndpoint extends DefaultEndpoint { return true; } + @Override + public boolean isSingleton() { + return true; + } + public String getTemplateId() { return templateId; } @@ -91,88 +127,71 @@ public class KameletEndpoint extends DefaultEndpoint { return routeId; } + public void setKameletProperties(Map<String, Object> kameletProperties) { + if (kameletProperties != null) { + this.kameletProperties.clear(); + this.kameletProperties.putAll(kameletProperties); + } + } + public Map<String, Object> getKameletProperties() { - return kameletProperties; + return Collections.unmodifiableMap(kameletProperties); } @Override public Producer createProducer() throws Exception { - return new KameletProducer(); + return new KameletProducer(this); } @Override public Consumer createConsumer(Processor processor) throws Exception { - Consumer answer = new KemeletConsumer(processor); + Consumer answer = new KameletConsumer(this, processor); configureConsumer(answer); return answer; } - @Override - protected void doInit() throws Exception { - super.doInit(); - getComponent().onEndpointAdd(this); - } - // ********************************* // // Helpers // // ********************************* - private class KemeletConsumer extends DefaultConsumer { - private volatile Endpoint endpoint; - private volatile Consumer consumer; - - public KemeletConsumer(Processor processor) { - super(KameletEndpoint.this, processor); - } - - @Override - protected void doStart() throws Exception { - endpoint = getCamelContext().getEndpoint(kameletUri); - consumer = endpoint.createConsumer(getProcessor()); - - ServiceHelper.startService(endpoint, consumer); - super.doStart(); - } - - @Override - protected void doStop() throws Exception { - ServiceHelper.stopService(consumer, endpoint); - super.doStop(); + void addConsumer(KameletConsumer consumer) { + synchronized (consumers) { + if (consumers.putIfAbsent(key, consumer) != null) { + throw new IllegalArgumentException( + "Cannot add a 2nd consumer to the same endpoint. Endpoint " + this + " only allows one consumer."); + } + consumers.notifyAll(); } } - private class KameletProducer extends DefaultAsyncProducer { - private volatile Endpoint endpoint; - private volatile AsyncProducer producer; - - public KameletProducer() { - super(KameletEndpoint.this); + void removeConsumer(KameletConsumer consumer) { + synchronized (consumers) { + consumers.remove(key, consumer); + consumers.notifyAll(); } + } - @Override - public boolean process(Exchange exchange, AsyncCallback callback) { - if (producer != null) { - return producer.process(exchange, callback); - } else { - callback.done(true); - return true; + KameletConsumer getConsumer() throws InterruptedException { + synchronized (consumers) { + KameletConsumer answer = consumers.get(key); + if (answer == null && block) { + StopWatch watch = new StopWatch(); + for (; ; ) { + answer =consumers.get(key); + if (answer != null) { + break; + } + long rem = timeout - watch.taken(); + if (rem <= 0) { + break; + } + consumers.wait(rem); + } } - } - - @Override - protected void doStart() throws Exception { - endpoint = getCamelContext().getEndpoint(kameletUri); - producer = endpoint.createAsyncProducer(); - ServiceHelper.startService(endpoint, producer); - super.doStart(); - } - @Override - protected void doStop() throws Exception { - ServiceHelper.stopService(producer, endpoint); - super.doStop(); + return answer; } } } \ No newline at end of file diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java new file mode 100644 index 0000000..9e6d86d --- /dev/null +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kamelet; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.CamelExchangeException; +import org.apache.camel.Exchange; +import org.apache.camel.support.DefaultAsyncProducer; + +final class KameletProducer extends DefaultAsyncProducer { + public KameletProducer(KameletEndpoint endpoint) { + super(endpoint); + } + + @Override + public KameletEndpoint getEndpoint() { + return (KameletEndpoint)super.getEndpoint(); + } + + @Override + public void process(Exchange exchange) throws Exception { + final KameletConsumer consumer = getEndpoint().getConsumer(); + + if (consumer != null) { + consumer.getProcessor().process(exchange); + } else { + exchange.setException( + new CamelExchangeException( + "No consumers available on endpoint: " + getEndpoint(), exchange) + ); + } + } + + @Override + public boolean process(Exchange exchange, AsyncCallback callback) { + try { + final KameletConsumer consumer = getEndpoint().getConsumer();; + + if (consumer != null) { + return consumer.getAsyncProcessor().process(exchange, callback); + } else { + exchange.setException( + new CamelExchangeException( + "No consumers available on endpoint: " + getEndpoint(), exchange) + ); + + callback.done(true); + return true; + } + } catch (Exception e) { + exchange.setException(e); + callback.done(true); + return true; + } + } +} diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletBasicTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletBasicTest.java index e023e07..5826f21 100644 --- a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletBasicTest.java +++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletBasicTest.java @@ -78,13 +78,13 @@ public class KameletBasicTest extends CamelTestSupport { public void configure() throws Exception { routeTemplate("setBody") .templateParameter("bodyValue") - .from("direct:{{routeId}}") + .from("kamelet:source") .setBody().constant("{{bodyValue}}"); routeTemplate("tick") .from("timer:{{routeId}}?repeatCount=1&delay=-1") .setBody().exchangeProperty(Exchange.TIMER_COUNTER) - .to("direct:{{routeId}}"); + .to("kamelet:sink"); from("direct:templateEmbedded") .toF("kamelet:setBody/embedded?bodyValue=embedded"); diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletPropertiesTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletPropertiesTest.java index d33a15b..67d6ff5 100644 --- a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletPropertiesTest.java +++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletPropertiesTest.java @@ -78,7 +78,7 @@ public class KameletPropertiesTest extends CamelTestSupport { // template routeTemplate("setBody") .templateParameter("bodyValue") - .from("direct:{{routeId}}") + .from("kamelet:source") .setBody().constant("{{bodyValue}}"); } }; diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletRouteTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletRouteTest.java index 8aed313..7e8c345 100644 --- a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletRouteTest.java +++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletRouteTest.java @@ -18,16 +18,13 @@ package org.apache.camel.component.kamelet; import java.util.UUID; -import org.apache.camel.CamelExecutionException; import org.apache.camel.RoutesBuilder; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.direct.DirectConsumerNotAvailableException; import org.apache.camel.test.junit5.CamelTestSupport; import org.apache.http.annotation.Obsolete; import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatExceptionOfType; public class KameletRouteTest extends CamelTestSupport { @Test @@ -48,15 +45,6 @@ public class KameletRouteTest extends CamelTestSupport { ).isEqualTo("b-a-" + body); } - @Test - public void testFailure() { - String body = UUID.randomUUID().toString(); - - assertThatExceptionOfType(CamelExecutionException.class) - .isThrownBy(() -> fluentTemplate.toF("direct:fail").withBody(body).request(String.class)) - .withCauseExactlyInstanceOf(DirectConsumerNotAvailableException.class); - } - // ********************************************** // // test set-up @@ -70,12 +58,7 @@ public class KameletRouteTest extends CamelTestSupport { public void configure() throws Exception { routeTemplate("echo") .templateParameter("prefix") - .from("direct:{{routeId}}") - .setBody().simple("{{prefix}}-${body}"); - - routeTemplate("echo-fail") - .templateParameter("prefix") - .from("direct:#property:routeId") + .from("kamelet:source") .setBody().simple("{{prefix}}-${body}"); from("direct:single") @@ -86,10 +69,6 @@ public class KameletRouteTest extends CamelTestSupport { .to("kamelet:echo/1?prefix=a") .to("kamelet:echo/2?prefix=b") .log("${body}"); - - from("direct:fail") - .to("kamelet:echo-fail?prefix=a") - .log("${body}"); } }; } diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletValidationTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletValidationTest.java index e16cf10..f35c8e3 100644 --- a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletValidationTest.java +++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletValidationTest.java @@ -34,7 +34,7 @@ public class KameletValidationTest { public void configure() throws Exception { routeTemplate("setBody") .templateParameter("bodyValue") - .from("direct:{{routeId}}") + .from("kamelet:source") .setBody().constant("{{bodyValue}}"); from("direct:start") diff --git a/components/camel-kamelet/src/test/resources/log4j2-test.xml b/components/camel-kamelet/src/test/resources/log4j2-test.xml index d5df1ad..8ce15f1 100644 --- a/components/camel-kamelet/src/test/resources/log4j2-test.xml +++ b/components/camel-kamelet/src/test/resources/log4j2-test.xml @@ -32,9 +32,7 @@ <Logger name="org.apache.camel.component.kamelet" level="TRACE"/> <Root level="INFO"> - <!-- <AppenderRef ref="STDOUT"/> - --> <AppenderRef ref="FILE"/> </Root> </Loggers>