This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git

commit 20832497e8d1b5b622c203b5927cf5e02e88a591
Author: lburgazzoli <lburgazz...@gmail.com>
AuthorDate: Fri May 15 14:21:07 2020 +0200

    Improve knative broker integration #326
---
 camel-knative/camel-knative-http/pom.xml           |   5 -
 .../component/knative/http/KnativeHttpSupport.java |  30 +-
 .../knative/http/KnativeHttpTransport.java         |   8 +-
 .../component/knative/http/KnativeHttpServer.java  | 214 +++++++
 .../component/knative/http/KnativeHttpTest.java    | 656 +++++++++------------
 .../knative/ce/AbstractCloudEventProcessor.java    |  32 +-
 6 files changed, 563 insertions(+), 382 deletions(-)

diff --git a/camel-knative/camel-knative-http/pom.xml 
b/camel-knative/camel-knative-http/pom.xml
index abd18b0..12f6b5d 100644
--- a/camel-knative/camel-knative-http/pom.xml
+++ b/camel-knative/camel-knative-http/pom.xml
@@ -94,11 +94,6 @@
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
-            <artifactId>camel-undertow</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.camel</groupId>
             <artifactId>camel-http</artifactId>
             <scope>test</scope>
         </dependency>
diff --git 
a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java
 
b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java
index f113e78..36ef1ca 100644
--- 
a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java
+++ 
b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java
@@ -26,6 +26,7 @@ import java.util.stream.Collectors;
 import io.vertx.core.http.HttpServerRequest;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.component.knative.spi.CloudEvent;
 import org.apache.camel.component.knative.spi.Knative;
@@ -101,10 +102,37 @@ public final class KnativeHttpSupport {
             @Override
             public boolean process(Exchange exchange, AsyncCallback callback) {
                 return processor.process(exchange, doneSync -> {
+                    final Message message = exchange.getMessage();
+
                     // remove CloudEvent headers
                     for (CloudEvent.Attribute attr : ce.attributes()) {
-                        exchange.getMessage().removeHeader(attr.http());
+                        message.removeHeader(attr.http());
+                    }
+
+                    callback.done(doneSync);
+                });
+            }
+        };
+    }
+
+    /**
+     * Remap camel headers to cloud event http headers.
+     */
+    public static Processor remalCloudEventHeaders(Processor delegate, 
CloudEvent ce) {
+        return new DelegateAsyncProcessor(delegate) {
+            @Override
+            public boolean process(Exchange exchange, AsyncCallback callback) {
+                return processor.process(exchange, doneSync -> {
+                    final Message message = exchange.getMessage();
+
+                    // remap CloudEvent camel --> http
+                    for (CloudEvent.Attribute attr : ce.attributes()) {
+                        Object value = message.getHeader(attr.id());
+                        if (value != null) {
+                            message.setHeader(attr.http(), value);
+                        }
                     }
+
                     callback.done(doneSync);
                 });
             }
diff --git 
a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java
 
b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java
index 9e7eb51..cdd7c8b 100644
--- 
a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java
+++ 
b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java
@@ -32,7 +32,7 @@ import org.apache.camel.support.service.ServiceSupport;
 public class KnativeHttpTransport extends ServiceSupport implements 
CamelContextAware, KnativeTransport {
     public static final int DEFAULT_PORT = 8080;
     public static final String DEFAULT_PATH = "/";
-    
+
     private PlatformHttp platformHttp;
     private WebClientOptions vertxHttpClientOptions;
     private CamelContext camelContext;
@@ -96,10 +96,12 @@ public class KnativeHttpTransport extends ServiceSupport 
implements CamelContext
 
     @Override
     public Consumer createConsumer(Endpoint endpoint, 
KnativeTransportConfiguration config, 
KnativeEnvironment.KnativeServiceDefinition service, Processor processor) {
-        Processor next = processor;
+        Processor next = KnativeHttpSupport.remalCloudEventHeaders(processor, 
config.getCloudEvent());
+
         if (config.isRemoveCloudEventHeadersInReply()) {
-            next = KnativeHttpSupport.withoutCloudEventHeaders(processor, 
config.getCloudEvent());
+            next = KnativeHttpSupport.withoutCloudEventHeaders(next, 
config.getCloudEvent());
         }
+
         return new KnativeHttpConsumer(this, endpoint, service, 
this.platformHttp, next);
     }
 
diff --git 
a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpServer.java
 
b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpServer.java
new file mode 100644
index 0000000..cf3a507
--- /dev/null
+++ 
b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpServer.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative.http;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import io.vertx.core.Handler;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpServer;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.ext.web.Router;
+import io.vertx.ext.web.RoutingContext;
+import io.vertx.ext.web.handler.BodyHandler;
+import org.apache.camel.CamelContext;
+import org.apache.camel.component.platform.http.PlatformHttpConstants;
+import org.apache.camel.support.service.ServiceSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KnativeHttpServer extends ServiceSupport {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(KnativeHttpServer.class);
+
+    private final CamelContext context;
+    private final String host;
+    private final int port;
+    private final String path;
+
+    private Vertx vertx;
+    private Router router;
+    private ExecutorService executor;
+    private HttpServer server;
+    private BlockingQueue<HttpServerRequest> requests;
+    private Handler<RoutingContext> handler;
+
+    public KnativeHttpServer(CamelContext context, int port) {
+        this(context, "localhost", port, "/", null);
+    }
+
+    public KnativeHttpServer(CamelContext context, int port, 
Handler<RoutingContext> handler) {
+        this(context, "localhost", port, "/", handler);
+    }
+
+    public KnativeHttpServer(CamelContext context, String host, int port, 
String path) {
+        this(context, host, port, path, null);
+    }
+
+    public KnativeHttpServer(CamelContext context, String host, int port, 
String path, Handler<RoutingContext> handler) {
+        this.context = context;
+        this.host = host;
+        this.port = port;
+        this.path = path;
+        this.requests = new LinkedBlockingQueue<>();
+        this.handler = handler != null
+            ? handler
+            : event -> {
+                event.response().setStatusCode(200);
+                event.response().end();
+            };
+    }
+
+    public HttpServerRequest poll(int timeout, TimeUnit unit) throws 
InterruptedException {
+        return requests.poll(timeout, unit);
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        this.executor = 
context.getExecutorServiceManager().newSingleThreadExecutor(this, 
"knative-http-server");
+        this.vertx = Vertx.vertx();
+        this.server = vertx.createHttpServer();
+        this.router = Router.router(vertx);
+        this.router.route(path)
+            .handler(event -> {
+                event.request().resume();
+                BodyHandler.create().handle(event);
+            })
+            .handler(event -> {
+                this.requests.offer(event.request());
+                event.next();
+            })
+            .handler(handler);
+
+        CompletableFuture.runAsync(
+            () -> {
+                CountDownLatch latch = new CountDownLatch(1);
+                server.requestHandler(router).listen(port, host, result -> {
+                    try {
+                        if (result.failed()) {
+                            LOGGER.warn("Failed to start Vert.x HttpServer on 
{}:{}, reason: {}",
+                                host,
+                                port,
+                                result.cause().getMessage()
+                            );
+
+                            throw new RuntimeException(result.cause());
+                        }
+
+                        LOGGER.info("Vert.x HttpServer started on {}:{}", 
host, port);
+                    } finally {
+                        latch.countDown();
+                    }
+                });
+
+                try {
+                    latch.await();
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            },
+            executor
+        ).toCompletableFuture().join();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        try {
+            if (server != null) {
+                CompletableFuture.runAsync(
+                    () -> {
+                        CountDownLatch latch = new CountDownLatch(1);
+
+                        // remove the platform-http component
+                        
context.removeComponent(PlatformHttpConstants.PLATFORM_HTTP_COMPONENT_NAME);
+
+                        server.close(result -> {
+                            try {
+                                if (result.failed()) {
+                                    LOGGER.warn("Failed to close Vert.x 
HttpServer reason: {}",
+                                        result.cause().getMessage()
+                                    );
+
+                                    throw new RuntimeException(result.cause());
+                                }
+
+                                LOGGER.info("Vert.x HttpServer stopped");
+                            } finally {
+                                latch.countDown();
+                            }
+                        });
+
+                        try {
+                            latch.await();
+                        } catch (InterruptedException e) {
+                            throw new RuntimeException(e);
+                        }
+                    },
+                    executor
+                ).toCompletableFuture().join();
+            }
+        } finally {
+            this.server = null;
+        }
+
+        if (vertx != null) {
+            Future<?> future = executor.submit(
+                () -> {
+                    CountDownLatch latch = new CountDownLatch(1);
+
+                    vertx.close(result -> {
+                        try {
+                            if (result.failed()) {
+                                LOGGER.warn("Failed to close Vert.x reason: 
{}",
+                                    result.cause().getMessage()
+                                );
+
+                                throw new RuntimeException(result.cause());
+                            }
+
+                            LOGGER.info("Vert.x stopped");
+                        } finally {
+                            latch.countDown();
+                        }
+                    });
+
+                    try {
+                        latch.await();
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            );
+
+            try {
+                future.get();
+            } finally {
+                vertx = null;
+            }
+        }
+
+        if (executor != null) {
+            context.getExecutorServiceManager().shutdown(executor);
+            executor = null;
+        }
+    }
+}
diff --git 
a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
 
b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
index 657a74e..e0d6612 100644
--- 
a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
+++ 
b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
@@ -23,15 +23,12 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Random;
 import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import io.restassured.RestAssured;
 import io.restassured.mapper.ObjectMapperType;
-import io.undertow.Undertow;
-import io.undertow.server.HttpServerExchange;
-import io.undertow.util.HeaderMap;
+import io.vertx.core.http.HttpServerRequest;
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelException;
 import org.apache.camel.Exchange;
@@ -117,7 +114,7 @@ public class KnativeHttpTest {
     void doTestKnativeSource(CloudEvent ce, String basePath, String path) 
throws Exception {
         KnativeComponent component = configureKnativeComponent(
             context,
-            CloudEvents.V03,
+            ce,
             sourceEndpoint(
                 "myEndpoint",
                 mapOf(
@@ -131,23 +128,20 @@ public class KnativeHttpTest {
             component.getConfiguration().addTransportOptions("basePath", 
basePath);
         }
 
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("knative:endpoint/myEndpoint")
-                    .to("mock:ce");
-            }
+        RouteBuilder.addRoutes(context, b -> {
+            b.from("knative:endpoint/myEndpoint")
+                .to("mock:ce");
         });
 
         context.start();
 
         MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
-        
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(),
 ce.version());
-        
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(),
 "org.apache.camel.event");
-        
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(),
 "/somewhere");
+        mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, 
ce.version());
+        mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, 
"org.apache.camel.event");
+        mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, 
"/somewhere");
         mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
-        mock.expectedMessagesMatches(e -> 
e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http()));
-        mock.expectedMessagesMatches(e -> 
e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http()));
+        mock.expectedMessagesMatches(e -> 
e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_TIME));
+        mock.expectedMessagesMatches(e -> 
e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_ID));
         mock.expectedBodiesReceived("test");
         mock.expectedMessageCount(1);
 
@@ -214,14 +208,11 @@ public class KnativeHttpTest {
                 ))
         );
 
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("direct:source")
-                    .to("knative:endpoint/myEndpoint");
-                from("platform-http:/a/path")
-                    .to("mock:ce");
-            }
+        RouteBuilder.addRoutes(context, b -> {
+            b.from("direct:source")
+                .to("knative:endpoint/myEndpoint");
+            b.from("platform-http:/a/path")
+                .to("mock:ce");
         });
 
         context.start();
@@ -236,7 +227,7 @@ public class KnativeHttpTest {
         mock.expectedBodiesReceived("test");
         mock.expectedMessageCount(1);
 
-        context.createProducerTemplate().sendBody("direct:source", "test");
+        template.sendBody("direct:source", "test");
 
         mock.assertIsSatisfied();
     }
@@ -255,23 +246,20 @@ public class KnativeHttpTest {
                 ))
         );
 
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("knative:endpoint/myEndpoint")
-                    .to("mock:ce");
-            }
+        RouteBuilder.addRoutes(context, b -> {
+            b.from("knative:endpoint/myEndpoint")
+                .to("mock:ce");
         });
 
         context.start();
 
         MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
-        
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(),
 ce.version());
-        
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(),
 "org.apache.camel.event");
-        
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id(),
 "myEventID");
-        
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).id(),
 "/somewhere");
+        mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, 
ce.version());
+        mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, 
"org.apache.camel.event");
+        mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_ID, 
"myEventID");
+        mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, 
"/somewhere");
         mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
-        mock.expectedMessagesMatches(e -> 
e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id()));
+        mock.expectedMessagesMatches(e -> 
e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_TIME));
         mock.expectedBodiesReceived("test");
         mock.expectedMessageCount(1);
 
@@ -280,19 +268,19 @@ public class KnativeHttpTest {
                 .contentType(Knative.MIME_STRUCTURED_CONTENT_MODE)
                 .body(
                     mapOf(
-                            "cloudEventsVersion", ce.version(),
-                            "eventType", "org.apache.camel.event",
-                            "eventID", "myEventID",
-                            "eventTime", 
DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()),
-                            "source", "/somewhere",
-                            "contentType", "text/plain",
-                            "data", "test"
+                        "cloudEventsVersion", ce.version(),
+                        "eventType", "org.apache.camel.event",
+                        "eventID", "myEventID",
+                        "eventTime", 
DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()),
+                        "source", "/somewhere",
+                        "contentType", "text/plain",
+                        "data", "test"
                     ),
                     ObjectMapperType.JACKSON_2
                 )
-            .when()
+                .when()
                 .post()
-            .then()
+                .then()
                 .statusCode(200);
         } else if (Objects.equals(CloudEvents.V02.version(), ce.version())) {
             given()
@@ -309,9 +297,9 @@ public class KnativeHttpTest {
                     ),
                     ObjectMapperType.JACKSON_2
                 )
-            .when()
+                .when()
                 .post()
-            .then()
+                .then()
                 .statusCode(200);
         } else if (Objects.equals(CloudEvents.V03.version(), ce.version())) {
             given()
@@ -328,9 +316,9 @@ public class KnativeHttpTest {
                     ),
                     ObjectMapperType.JACKSON_2
                 )
-            .when()
+                .when()
                 .post()
-            .then()
+                .then()
                 .statusCode(200);
         } else {
             throw new IllegalArgumentException("Unknown CloudEvent spec: " + 
ce.version());
@@ -353,28 +341,20 @@ public class KnativeHttpTest {
                 ))
         );
 
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("knative:endpoint/myEndpoint")
-                    .to("mock:ce");
-            }
+        RouteBuilder.addRoutes(context, b -> {
+            b.from("knative:endpoint/myEndpoint")
+                .to("mock:ce");
         });
 
         context.start();
 
         MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
-        
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(),
 ce.version());
-        
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(),
 ce.version());
-        
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(),
 "org.apache.camel.event");
-        
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(),
 "org.apache.camel.event");
-        
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id(),
 "myEventID");
-        
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(),
 "myEventID");
-        
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).id(),
 "/somewhere");
-        
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(),
 "/somewhere");
+        mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, 
ce.version());
+        mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, 
"org.apache.camel.event");
+        mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_ID, 
"myEventID");
+        mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, 
"/somewhere");
         mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
-        mock.expectedMessagesMatches(e -> 
e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id()));
-        mock.expectedMessagesMatches(e -> 
e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http()));
+        mock.expectedMessagesMatches(e -> 
e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_TIME));
         mock.expectedBodiesReceived("test");
         mock.expectedMessageCount(1);
 
@@ -416,37 +396,34 @@ public class KnativeHttpTest {
                 ))
         );
 
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("knative:endpoint/ep1")
-                    .convertBodyTo(String.class)
-                    .to("log:ce1?showAll=true&multiline=true")
-                    .to("mock:ce1");
-                from("knative:endpoint/ep2")
-                    .convertBodyTo(String.class)
-                    .to("log:ce2?showAll=true&multiline=true")
-                    .to("mock:ce2");
-            }
+        RouteBuilder.addRoutes(context, b -> {
+            b.from("knative:endpoint/ep1")
+                .convertBodyTo(String.class)
+                .to("log:ce1?showAll=true&multiline=true")
+                .to("mock:ce1");
+            b.from("knative:endpoint/ep2")
+                .convertBodyTo(String.class)
+                .to("log:ce2?showAll=true&multiline=true")
+                .to("mock:ce2");
         });
 
         context.start();
 
         MockEndpoint mock1 = context.getEndpoint("mock:ce1", 
MockEndpoint.class);
-        mock1.expectedMessagesMatches(e -> 
e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id()));
-        
mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(),
 ce.version());
-        
mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(),
 "org.apache.camel.event");
-        
mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id(),
 "myEventID1");
-        
mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).id(),
 "CE1");
+        mock1.expectedMessagesMatches(e -> 
e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_TIME));
+        mock1.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, 
ce.version());
+        mock1.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, 
"org.apache.camel.event");
+        mock1.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_ID, 
"myEventID1");
+        mock1.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, 
"CE1");
         mock1.expectedBodiesReceived("test");
         mock1.expectedMessageCount(1);
 
         MockEndpoint mock2 = context.getEndpoint("mock:ce2", 
MockEndpoint.class);
-        mock2.expectedMessagesMatches(e -> 
e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id()));
-        
mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(),
 ce.version());
-        
mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(),
 "org.apache.camel.event");
-        
mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id(),
 "myEventID2");
-        
mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).id(),
 "CE2");
+        mock2.expectedMessagesMatches(e -> 
e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_TIME));
+        mock2.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, 
ce.version());
+        mock2.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, 
"org.apache.camel.event");
+        mock2.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_ID, 
"myEventID2");
+        mock2.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, 
"CE2");
         mock2.expectedBodiesReceived("test");
         mock2.expectedMessageCount(1);
 
@@ -458,9 +435,9 @@ public class KnativeHttpTest {
             
.header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), 
"myEventID1")
             
.header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), 
DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
             
.header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), 
"CE1")
-        .when()
+            .when()
             .post()
-        .then()
+            .then()
             .statusCode(200);
 
         given()
@@ -502,37 +479,34 @@ public class KnativeHttpTest {
                 ))
         );
 
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("knative:endpoint/ep1")
-                    .convertBodyTo(String.class)
-                    .to("log:ce1?showAll=true&multiline=true")
-                    .to("mock:ce1");
-                from("knative:endpoint/ep2")
-                    .convertBodyTo(String.class)
-                    .to("log:ce2?showAll=true&multiline=true")
-                    .to("mock:ce2");
-            }
+        RouteBuilder.addRoutes(context, b -> {
+            b.from("knative:endpoint/ep1")
+                .convertBodyTo(String.class)
+                .to("log:ce1?showAll=true&multiline=true")
+                .to("mock:ce1");
+            b.from("knative:endpoint/ep2")
+                .convertBodyTo(String.class)
+                .to("log:ce2?showAll=true&multiline=true")
+                .to("mock:ce2");
         });
 
         context.start();
 
         MockEndpoint mock1 = context.getEndpoint("mock:ce1", 
MockEndpoint.class);
-        mock1.expectedMessagesMatches(e -> 
e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id()));
-        
mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(),
 ce.version());
-        
mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(),
 "org.apache.camel.event");
-        
mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id(),
 "myEventID1");
-        
mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).id(),
 "CE0");
+        mock1.expectedMessagesMatches(e -> 
e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_TIME));
+        mock1.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, 
ce.version());
+        mock1.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, 
"org.apache.camel.event");
+        mock1.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_ID, 
"myEventID1");
+        mock1.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, 
"CE0");
         mock1.expectedBodiesReceived("test");
         mock1.expectedMessageCount(1);
 
         MockEndpoint mock2 = context.getEndpoint("mock:ce2", 
MockEndpoint.class);
-        mock2.expectedMessagesMatches(e -> 
e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id()));
-        
mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(),
 ce.version());
-        
mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(),
 "org.apache.camel.event");
-        
mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id(),
 "myEventID2");
-        
mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).id(),
 "CE5");
+        mock2.expectedMessagesMatches(e -> 
e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_TIME));
+        mock2.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, 
ce.version());
+        mock2.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, 
"org.apache.camel.event");
+        mock2.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_ID, 
"myEventID2");
+        mock2.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, 
"CE5");
         mock2.expectedBodiesReceived("test");
         mock2.expectedMessageCount(1);
 
@@ -575,37 +549,34 @@ public class KnativeHttpTest {
             sourceEvent("default")
         );
 
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("knative:event/event1")
-                    .convertBodyTo(String.class)
-                    .to("log:ce1?showAll=true&multiline=true")
-                    .to("mock:ce1");
-                from("knative:event/event2")
-                    .convertBodyTo(String.class)
-                    .to("log:ce2?showAll=true&multiline=true")
-                    .to("mock:ce2");
-            }
+        RouteBuilder.addRoutes(context, b -> {
+            b.from("knative:event/event1")
+                .convertBodyTo(String.class)
+                .to("log:ce1?showAll=true&multiline=true")
+                .to("mock:ce1");
+            b.from("knative:event/event2")
+                .convertBodyTo(String.class)
+                .to("log:ce2?showAll=true&multiline=true")
+                .to("mock:ce2");
         });
 
         context.start();
 
         MockEndpoint mock1 = context.getEndpoint("mock:ce1", 
MockEndpoint.class);
-        mock1.expectedMessagesMatches(e -> 
e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id()));
-        
mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(),
 ce.version());
-        
mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(),
 "event1");
-        
mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id(),
 "myEventID1");
-        
mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).id(),
 "CE1");
+        mock1.expectedMessagesMatches(e -> 
e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_TIME));
+        mock1.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, 
ce.version());
+        mock1.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, 
"event1");
+        mock1.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_ID, 
"myEventID1");
+        mock1.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, 
"CE1");
         mock1.expectedBodiesReceived("test");
         mock1.expectedMessageCount(1);
 
         MockEndpoint mock2 = context.getEndpoint("mock:ce2", 
MockEndpoint.class);
-        mock2.expectedMessagesMatches(e -> 
e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id()));
-        
mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(),
 ce.version());
-        
mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(),
 "event2");
-        
mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id(),
 "myEventID2");
-        
mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).id(),
 "CE2");
+        mock2.expectedMessagesMatches(e -> 
e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_TIME));
+        mock2.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, 
ce.version());
+        mock2.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, 
"event2");
+        mock2.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_ID, 
"myEventID2");
+        mock2.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, 
"CE2");
         mock2.expectedBodiesReceived("test");
         mock2.expectedMessageCount(1);
 
@@ -662,20 +633,17 @@ public class KnativeHttpTest {
                 ))
         );
 
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("knative:endpoint/from")
-                    .convertBodyTo(String.class)
-                    .setBody()
-                        .constant("consumer")
-                    
.setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())
-                        .constant("custom");
-                from("direct:source")
-                    .to("knative://endpoint/to")
-                    .log("${body}")
-                    .to("mock:to");
-            }
+        RouteBuilder.addRoutes(context, b -> {
+            b.from("knative:endpoint/from")
+                .convertBodyTo(String.class)
+                .setBody()
+                .constant("consumer")
+                .setHeader(CloudEvent.CAMEL_CLOUD_EVENT_TYPE)
+                .constant("custom");
+            b.from("direct:source")
+                .to("knative://endpoint/to")
+                .log("${body}")
+                .to("mock:to");
         });
 
         MockEndpoint mock = context.getEndpoint("mock:to", MockEndpoint.class);
@@ -684,7 +652,7 @@ public class KnativeHttpTest {
         mock.expectedMessageCount(1);
 
         context.start();
-        context.createProducerTemplate().sendBody("direct:source", "");
+        template.sendBody("direct:source", "");
 
         mock.assertIsSatisfied();
     }
@@ -712,20 +680,17 @@ public class KnativeHttpTest {
                 ))
         );
 
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("knative:endpoint/from?replyWithCloudEvent=true")
-                    .convertBodyTo(String.class)
-                    .setBody()
-                        .constant("consumer")
-                    
.setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())
-                        .constant("custom");
-                from("direct:source")
-                    .to("knative://endpoint/to")
-                    .log("${body}")
-                    .to("mock:to");
-            }
+        RouteBuilder.addRoutes(context, b -> {
+            b.from("knative:endpoint/from?replyWithCloudEvent=true")
+                .convertBodyTo(String.class)
+                .setBody()
+                .constant("consumer")
+                .setHeader(CloudEvent.CAMEL_CLOUD_EVENT_TYPE)
+                .constant("custom");
+            b.from("direct:source")
+                .to("knative://endpoint/to")
+                .log("${body}")
+                .to("mock:to");
         });
 
         MockEndpoint mock = context.getEndpoint("mock:to", MockEndpoint.class);
@@ -734,7 +699,7 @@ public class KnativeHttpTest {
         mock.expectedMessageCount(1);
 
         context.start();
-        context.createProducerTemplate().sendBody("direct:source", "");
+        template.sendBody("direct:source", "");
 
         mock.assertIsSatisfied();
     }
@@ -968,28 +933,25 @@ public class KnativeHttpTest {
                 ))
         );
 
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("direct:source")
-                    .to("knative:event/myEvent");
-                fromF("knative:event/myEvent")
-                    .to("mock:ce");
-            }
+        RouteBuilder.addRoutes(context, b -> {
+            b.from("direct:source")
+                .to("knative:event/myEvent");
+            b.from("knative:event/myEvent")
+                .to("mock:ce");
         });
 
         context.start();
 
         MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
-        
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(),
 ce.version());
-        
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(),
 "myEvent");
+        mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, 
ce.version());
+        mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, 
"myEvent");
         mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
-        mock.expectedMessagesMatches(e -> 
e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id()));
-        mock.expectedMessagesMatches(e -> 
e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id()));
+        mock.expectedMessagesMatches(e -> 
e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_TIME));
+        mock.expectedMessagesMatches(e -> 
e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_ID));
         mock.expectedBodiesReceived("test");
         mock.expectedMessageCount(1);
 
-        context.createProducerTemplate().sendBody("direct:source", "test");
+        template.sendBody("direct:source", "test");
 
         mock.assertIsSatisfied();
     }
@@ -1021,28 +983,25 @@ public class KnativeHttpTest {
                 ))
         );
 
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("direct:source")
-                    .to("knative:event/myEvent?kind=MyObject&apiVersion=v1");
-                from("knative:event/myEvent?kind=MyOtherObject&apiVersion=v2")
-                    .to("mock:ce");
-            }
+        RouteBuilder.addRoutes(context, b -> {
+            b.from("direct:source")
+                .to("knative:event/myEvent?kind=MyObject&apiVersion=v1");
+            b.from("knative:event/myEvent?kind=MyOtherObject&apiVersion=v2")
+                .to("mock:ce");
         });
 
         context.start();
 
         MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
-        
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(),
 ce.version());
-        
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(),
 "myEvent");
+        mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, 
ce.version());
+        mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, 
"myEvent");
         mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
-        mock.expectedMessagesMatches(e -> 
e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id()));
-        mock.expectedMessagesMatches(e -> 
e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id()));
+        mock.expectedMessagesMatches(e -> 
e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_TIME));
+        mock.expectedMessagesMatches(e -> 
e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_ID));
         mock.expectedBodiesReceived("test");
         mock.expectedMessageCount(1);
 
-        context.createProducerTemplate().sendBody("direct:source", "test");
+        template.sendBody("direct:source", "test");
 
         mock.assertIsSatisfied();
     }
@@ -1071,23 +1030,20 @@ public class KnativeHttpTest {
                 ))
         );
 
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("knative:endpoint/myEndpoint?kind=MyObject&apiVersion=v2")
-                    .to("mock:ce");
-            }
+        RouteBuilder.addRoutes(context, b -> {
+            b.from("knative:endpoint/myEndpoint?kind=MyObject&apiVersion=v2")
+                .to("mock:ce");
         });
 
         context.start();
 
         MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
-        
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(),
 ce.version());
-        
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(),
 "org.apache.camel.event");
-        
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id(),
 "myEventID");
-        
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).id(),
 "/somewhere");
+        mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, 
ce.version());
+        mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, 
"org.apache.camel.event");
+        mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_ID, 
"myEventID");
+        mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, 
"/somewhere");
         mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
-        mock.expectedMessagesMatches(e -> 
e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id()));
+        mock.expectedMessagesMatches(e -> 
e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_TIME));
         mock.expectedBodiesReceived("test");
         mock.expectedMessageCount(1);
 
@@ -1121,12 +1077,9 @@ public class KnativeHttpTest {
                 ))
         );
 
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("knative:endpoint/myEndpoint")
-                    .to("mock:ce");
-            }
+        RouteBuilder.addRoutes(context, b -> {
+            b.from("knative:endpoint/myEndpoint")
+                .to("mock:ce");
         });
 
         context.start();
@@ -1134,9 +1087,9 @@ public class KnativeHttpTest {
         given()
             .body("test")
             .header(Exchange.CONTENT_TYPE, "text/plain")
-        .when()
+            .when()
             .get()
-        .then()
+            .then()
             .statusCode(404);
     }
 
@@ -1157,12 +1110,9 @@ public class KnativeHttpTest {
                 ))
         );
 
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("direct:start")
-                    .to("knative:endpoint/myEndpoint");
-            }
+        RouteBuilder.addRoutes(context, b -> {
+            b.from("direct:start")
+                .to("knative:endpoint/myEndpoint");
         });
 
         context.start();
@@ -1177,6 +1127,10 @@ public class KnativeHttpTest {
     @EnumSource(CloudEvents.class)
     void testNoContent(CloudEvent ce) throws Exception {
         final int wordsPort = AvailablePortFinder.getNextAvailable();
+        final KnativeHttpServer server = new KnativeHttpServer(context, 
wordsPort, event -> {
+            event.response().setStatusCode(204);
+            event.response().end("");
+        });
 
         configureKnativeComponent(
             context,
@@ -1210,25 +1164,14 @@ public class KnativeHttpTest {
                 ))
         );
 
-        Undertow server = Undertow.builder()
-            .addHttpListener(wordsPort, "localhost")
-            .setHandler(exchange -> {
-                exchange.setStatusCode(204);
-                exchange.getResponseSender().send("");
-            })
-            .build();
-
         try {
             server.start();
 
-            context.addRoutes(new RouteBuilder() {
-                @Override
-                public void configure() throws Exception {
-                    from("knative:channel/messages")
-                        .transform().simple("transformed ${body}")
-                        .log("${body}")
-                        .to("knative:channel/words");
-                }
+            RouteBuilder.addRoutes(context, b -> {
+                b.from("knative:channel/messages")
+                    .transform().simple("transformed ${body}")
+                    .log("${body}")
+                    .to("knative:channel/words");
             });
 
             context.start();
@@ -1255,26 +1198,23 @@ public class KnativeHttpTest {
 
         configureKnativeComponent(context, ce, hops);
 
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("direct:start")
-                    .routeId("http")
-                    .toF("http://localhost:%d";, platformHttpPort)
-                    .convertBodyTo(String.class);
-
-                for (KnativeEnvironment.KnativeServiceDefinition definition: 
hops) {
-                    fromF("knative:endpoint/%s", definition.getName())
-                        .routeId(definition.getName())
-                        .setBody().constant(definition.getName());
-                }
+        RouteBuilder.addRoutes(context, b -> {
+            b.from("direct:start")
+                .routeId("http")
+                .toF("http://localhost:%d";, platformHttpPort)
+                .convertBodyTo(String.class);
+
+            for (KnativeEnvironment.KnativeServiceDefinition definition : 
hops) {
+                b.fromF("knative:endpoint/%s", definition.getName())
+                    .routeId(definition.getName())
+                    .setBody().constant(definition.getName());
             }
         });
 
         context.start();
 
         List<String> hopsDone = new ArrayList<>();
-        for (KnativeEnvironment.KnativeServiceDefinition definition: hops) {
+        for (KnativeEnvironment.KnativeServiceDefinition definition : hops) {
             hopsDone.add(definition.getName());
 
             Exchange result = template.request(
@@ -1293,6 +1233,7 @@ public class KnativeHttpTest {
     @EnumSource(CloudEvents.class)
     void testHeaders(CloudEvent ce) throws Exception {
         final int port = AvailablePortFinder.getNextAvailable();
+        final KnativeHttpServer server = new KnativeHttpServer(context, port);
 
         configureKnativeComponent(
             context,
@@ -1309,17 +1250,6 @@ public class KnativeHttpTest {
             )
         );
 
-        CountDownLatch latch = new CountDownLatch(1);
-        AtomicReference<HttpServerExchange> exchange = new AtomicReference<>();
-
-        Undertow server = Undertow.builder()
-            .addHttpListener(port, "localhost")
-            .setHandler(se -> {
-                exchange.set(se);
-                latch.countDown();
-            })
-            .build();
-
         RouteBuilder.addRoutes(context, b -> {
             b.from("direct:start")
                 .to("knative:endpoint/ep");
@@ -1330,16 +1260,13 @@ public class KnativeHttpTest {
             server.start();
             template.sendBody("direct:start", "");
 
-            latch.await();
-
-            HeaderMap headers = exchange.get().getRequestHeaders();
-
-            
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
-            
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("org.apache.camel.event");
-            
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull();
-            
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo("knative://endpoint/ep");
-            
assertThat(headers.getFirst(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
-        }  finally {
+            HttpServerRequest request = server.poll(30, TimeUnit.SECONDS);
+            
assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
+            
assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("org.apache.camel.event");
+            
assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull();
+            
assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo("knative://endpoint/ep");
+            
assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
+        } finally {
             server.stop();
         }
     }
@@ -1348,6 +1275,7 @@ public class KnativeHttpTest {
     @EnumSource(CloudEvents.class)
     void testHeadersOverrideFromEnv(CloudEvent ce) throws Exception {
         final int port = AvailablePortFinder.getNextAvailable();
+        final KnativeHttpServer server = new KnativeHttpServer(context, port);
         final String typeHeaderKey = 
ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http();
         final String typeHeaderVal = UUID.randomUUID().toString();
         final String sourceHeaderKey = 
ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http();
@@ -1370,17 +1298,6 @@ public class KnativeHttpTest {
             )
         );
 
-        CountDownLatch latch = new CountDownLatch(1);
-        AtomicReference<HttpServerExchange> exchange = new AtomicReference<>();
-
-        Undertow server = Undertow.builder()
-            .addHttpListener(port, "localhost")
-            .setHandler(se -> {
-                exchange.set(se);
-                latch.countDown();
-            })
-            .build();
-
         RouteBuilder.addRoutes(context, b -> {
             b.from("direct:start")
                 .to("knative:endpoint/ep");
@@ -1391,16 +1308,13 @@ public class KnativeHttpTest {
             server.start();
             template.sendBody("direct:start", "");
 
-            latch.await();
-
-            HeaderMap headers = exchange.get().getRequestHeaders();
-
-            
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
-            
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo(typeHeaderVal);
-            
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull();
-            
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo(sourceHeaderVal);
-            
assertThat(headers.getFirst(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
-        }  finally {
+            HttpServerRequest request = server.poll(30, TimeUnit.SECONDS);
+            
assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
+            
assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo(typeHeaderVal);
+            
assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull();
+            
assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo(sourceHeaderVal);
+            
assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
+        } finally {
             server.stop();
         }
     }
@@ -1409,6 +1323,7 @@ public class KnativeHttpTest {
     @EnumSource(CloudEvents.class)
     void testHeadersOverrideFromURI(CloudEvent ce) throws Exception {
         final int port = AvailablePortFinder.getNextAvailable();
+        final KnativeHttpServer server = new KnativeHttpServer(context, port);
         final String typeHeaderKey = 
ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http();
         final String typeHeaderVal = UUID.randomUUID().toString();
         final String sourceHeaderKey = 
ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http();
@@ -1429,17 +1344,6 @@ public class KnativeHttpTest {
             )
         );
 
-        CountDownLatch latch = new CountDownLatch(1);
-        AtomicReference<HttpServerExchange> exchange = new AtomicReference<>();
-
-        Undertow server = Undertow.builder()
-            .addHttpListener(port, "localhost")
-            .setHandler(se -> {
-                exchange.set(se);
-                latch.countDown();
-            })
-            .build();
-
         RouteBuilder.addRoutes(context, b -> {
             b.from("direct:start")
                 .toF("knative:endpoint/ep?%s=%s&%s=%s",
@@ -1452,16 +1356,13 @@ public class KnativeHttpTest {
             server.start();
             template.sendBody("direct:start", "");
 
-            latch.await();
-
-            HeaderMap headers = exchange.get().getRequestHeaders();
-
-            
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
-            
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo(typeHeaderVal);
-            
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull();
-            
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo(sourceHeaderVal);
-            
assertThat(headers.getFirst(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
-        }  finally {
+            HttpServerRequest request = server.poll(30, TimeUnit.SECONDS);
+            
assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
+            
assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo(typeHeaderVal);
+            
assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull();
+            
assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo(sourceHeaderVal);
+            
assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
+        } finally {
             server.stop();
         }
     }
@@ -1470,6 +1371,7 @@ public class KnativeHttpTest {
     @EnumSource(CloudEvents.class)
     void testHeadersOverrideFromConf(CloudEvent ce) throws Exception {
         final int port = AvailablePortFinder.getNextAvailable();
+        final KnativeHttpServer server = new KnativeHttpServer(context, port);
         final String typeHeaderKey = 
ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http();
         final String typeHeaderVal = UUID.randomUUID().toString();
         final String sourceHeaderKey = 
ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http();
@@ -1495,17 +1397,6 @@ public class KnativeHttpTest {
             Knative.KNATIVE_CE_OVERRIDE_PREFIX + sourceHeaderKey, 
sourceHeaderVal
         ));
 
-        CountDownLatch latch = new CountDownLatch(1);
-        AtomicReference<HttpServerExchange> exchange = new AtomicReference<>();
-
-        Undertow server = Undertow.builder()
-            .addHttpListener(port, "localhost")
-            .setHandler(se -> {
-                exchange.set(se);
-                latch.countDown();
-            })
-            .build();
-
         RouteBuilder.addRoutes(context, b -> {
             b.from("direct:start")
                 .to("knative:endpoint/ep");
@@ -1516,16 +1407,13 @@ public class KnativeHttpTest {
             server.start();
             template.sendBody("direct:start", "");
 
-            latch.await();
-
-            HeaderMap headers = exchange.get().getRequestHeaders();
-
-            
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
-            
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo(typeHeaderVal);
-            
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull();
-            
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo(sourceHeaderVal);
-            
assertThat(headers.getFirst(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
-        }  finally {
+            HttpServerRequest request = server.poll(30, TimeUnit.SECONDS);
+            
assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
+            
assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo(typeHeaderVal);
+            
assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull();
+            
assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo(sourceHeaderVal);
+            
assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
+        } finally {
             server.stop();
         }
     }
@@ -1534,6 +1422,7 @@ public class KnativeHttpTest {
     @EnumSource(CloudEvents.class)
     void testHeadersOverrideFromRouteWithCamelHeader(CloudEvent ce) throws 
Exception {
         final int port = AvailablePortFinder.getNextAvailable();
+        final KnativeHttpServer server = new KnativeHttpServer(context, port);
 
         configureKnativeComponent(
             context,
@@ -1550,17 +1439,6 @@ public class KnativeHttpTest {
             )
         );
 
-        CountDownLatch latch = new CountDownLatch(1);
-        AtomicReference<HttpServerExchange> exchange = new AtomicReference<>();
-
-        Undertow server = Undertow.builder()
-            .addHttpListener(port, "localhost")
-            .setHandler(se -> {
-                exchange.set(se);
-                latch.countDown();
-            })
-            .build();
-
         RouteBuilder.addRoutes(context, b -> {
             b.from("direct:start")
                 
.setHeader(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).constant("myType")
@@ -1572,16 +1450,13 @@ public class KnativeHttpTest {
             server.start();
             template.sendBody("direct:start", "");
 
-            latch.await();
-
-            HeaderMap headers = exchange.get().getRequestHeaders();
-
-            
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
-            
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("myType");
-            
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull();
-            
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo("knative://endpoint/ep");
-            
assertThat(headers.getFirst(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
-        }  finally {
+            HttpServerRequest request = server.poll(30, TimeUnit.SECONDS);
+            
assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
+            
assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("myType");
+            
assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull();
+            
assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo("knative://endpoint/ep");
+            
assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
+        } finally {
             server.stop();
         }
     }
@@ -1590,6 +1465,7 @@ public class KnativeHttpTest {
     @EnumSource(CloudEvents.class)
     void testHeadersOverrideFromRouteWithCEHeader(CloudEvent ce) throws 
Exception {
         final int port = AvailablePortFinder.getNextAvailable();
+        final KnativeHttpServer server = new KnativeHttpServer(context, port);
 
         configureKnativeComponent(
             context,
@@ -1606,17 +1482,6 @@ public class KnativeHttpTest {
             )
         );
 
-        CountDownLatch latch = new CountDownLatch(1);
-        AtomicReference<HttpServerExchange> exchange = new AtomicReference<>();
-
-        Undertow server = Undertow.builder()
-            .addHttpListener(port, "localhost")
-            .setHandler(se -> {
-                exchange.set(se);
-                latch.countDown();
-            })
-            .build();
-
         RouteBuilder.addRoutes(context, b -> {
             b.from("direct:start")
                 
.setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http()).constant("fromCEHeader")
@@ -1629,16 +1494,69 @@ public class KnativeHttpTest {
             server.start();
             template.sendBody("direct:start", "");
 
-            latch.await();
+            HttpServerRequest request = server.poll(30, TimeUnit.SECONDS);
+            
assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
+            
assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("fromCEHeader");
+            
assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull();
+            
assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo("knative://endpoint/ep");
+            
assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
+        } finally {
+            server.stop();
+        }
+    }
 
-            HeaderMap headers = exchange.get().getRequestHeaders();
+    @ParameterizedTest
+    @EnumSource(CloudEvents.class)
+    void testEventBridge(CloudEvent ce) throws Exception {
+        final int port = AvailablePortFinder.getNextAvailable();
+        final KnativeHttpServer server = new KnativeHttpServer(context, port);
+
+        configureKnativeComponent(
+            context,
+            ce,
+            event(
+                Knative.EndpointKind.sink,
+                "event.sink",
+                "localhost",
+                port,
+                mapOf(
+                    Knative.CONTENT_TYPE, "text/plain"
+                )),
+            sourceEvent(
+                "event.source",
+                mapOf(
+                    Knative.CONTENT_TYPE, "text/plain"
+                ))
+        );
+
+        RouteBuilder.addRoutes(context, b -> {
+            b.from("knative:event/event.source")
+                .to("knative:event/event.sink");
+        });
+
+        context.start();
+
+        try {
+            server.start();
+
+            given()
+                .body("test")
+                .header(Exchange.CONTENT_TYPE, "text/plain")
+                
.header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), 
ce.version())
+                
.header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), 
"event.source")
+                
.header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), 
"myEventID")
+                
.header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), 
DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
+                
.header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), 
"/somewhere")
+            .when()
+                .post()
+            .then()
+                .statusCode(204);
 
-            
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
-            
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("fromCEHeader");
-            
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull();
-            
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo("knative://endpoint/ep");
-            
assertThat(headers.getFirst(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
-        }  finally {
+            HttpServerRequest request = server.poll(30, TimeUnit.SECONDS);
+            
assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
+            
assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("event.sink");
+            
assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
+        } finally {
             server.stop();
         }
     }
diff --git 
a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java
 
b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java
index 73fac6e..d5a73da 100644
--- 
a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java
+++ 
b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java
@@ -31,6 +31,8 @@ import org.apache.camel.component.knative.KnativeEndpoint;
 import org.apache.camel.component.knative.spi.CloudEvent;
 import org.apache.camel.component.knative.spi.Knative;
 import org.apache.camel.component.knative.spi.KnativeEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 abstract class AbstractCloudEventProcessor implements CloudEventProcessor {
     private final CloudEvent cloudEvent;
@@ -57,7 +59,7 @@ abstract class AbstractCloudEventProcessor implements 
CloudEventProcessor {
                 final Map<String, Object> headers = 
exchange.getIn().getHeaders();
 
                 for (CloudEvent.Attribute attribute: ce.attributes()) {
-                    Object val = headers.get(attribute.http());
+                    Object val = headers.remove(attribute.http());
                     if (val != null) {
                         headers.put(attribute.id(), val);
                     }
@@ -75,6 +77,7 @@ abstract class AbstractCloudEventProcessor implements 
CloudEventProcessor {
     @Override
     public Processor producer(KnativeEndpoint endpoint, 
KnativeEnvironment.KnativeServiceDefinition service) {
         final CloudEvent ce = cloudEvent();
+        final Logger logger = LoggerFactory.getLogger(getClass());
 
         return exchange -> {
             final String contentType = 
service.getMetadata().get(Knative.CONTENT_TYPE);
@@ -89,12 +92,33 @@ abstract class AbstractCloudEventProcessor implements 
CloudEventProcessor {
 
             headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType);
 
+            //
+            // in case of events, the type of the event defined as URI param 
so we need
+            // to override it to avoid the event type be overridden by 
Messages's headers
+            //
+            if (endpoint.getType() == Knative.Type.event) {
+                Object eventType = 
headers.get(CloudEvent.CAMEL_CLOUD_EVENT_TYPE);
+
+                if (eventType != null) {
+                    logger.debug("Detected the presence of {} header with 
value {}: it will be ignored and replaced by value set as uri parameter {}",
+                        CloudEvent.CAMEL_CLOUD_EVENT_TYPE,
+                        eventType,
+                        endpoint.getName());
+                }
+
+                
headers.put(cloudEvent().mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(),
 endpoint.getName());
+            } else {
+                setCloudEventHeader(headers, 
CloudEvent.CAMEL_CLOUD_EVENT_TYPE, () -> {
+                    return service.getMetadata().getOrDefault(
+                        Knative.KNATIVE_EVENT_TYPE,
+                        endpoint.getConfiguration().getCloudEventsType()
+                    );
+                });
+            }
+
             setCloudEventHeader(headers, CloudEvent.CAMEL_CLOUD_EVENT_ID, 
exchange::getExchangeId);
             setCloudEventHeader(headers, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, 
endpoint::getEndpointUri);
             setCloudEventHeader(headers, CloudEvent.CAMEL_CLOUD_EVENT_VERSION, 
ce::version);
-            setCloudEventHeader(headers, CloudEvent.CAMEL_CLOUD_EVENT_TYPE, () 
-> {
-                return 
service.getMetadata().getOrDefault(Knative.KNATIVE_EVENT_TYPE, 
endpoint.getConfiguration().getCloudEventsType());
-            });
             setCloudEventHeader(headers, CloudEvent.CAMEL_CLOUD_EVENT_TIME, () 
-> {
                 final ZonedDateTime created = 
ZonedDateTime.ofInstant(Instant.ofEpochMilli(exchange.getCreated()), 
ZoneId.systemDefault());
                 final String eventTime = 
DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(created);

Reply via email to