This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git

commit 0f996a03f8a8904ec7dfc84f79aae4de68e641a2
Author: lburgazzoli <lburgazz...@gmail.com>
AuthorDate: Thu Jul 9 12:14:40 2020 +0200

    Event-loop blocked when using http component starting from knative #383
---
 camel-knative/camel-knative-http/pom.xml           |  4 ++
 .../knative/http/KnativeHttpConsumer.java          | 82 +++++++++++++++-------
 .../component/knative/http/KnativeHttpTest.java    | 64 +++++++++++++++++
 3 files changed, 125 insertions(+), 25 deletions(-)

diff --git a/camel-knative/camel-knative-http/pom.xml 
b/camel-knative/camel-knative-http/pom.xml
index c6d2c85..3ee7d3e 100644
--- a/camel-knative/camel-knative-http/pom.xml
+++ b/camel-knative/camel-knative-http/pom.xml
@@ -43,6 +43,10 @@
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-cloud</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-http</artifactId>
+        </dependency>
 
         <dependency>
             <groupId>org.apache.camel.k</groupId>
diff --git 
a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java
 
b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java
index 4764e40..4ce46b1 100644
--- 
a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java
+++ 
b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java
@@ -163,42 +163,74 @@ public class KnativeHttpConsumer extends DefaultConsumer {
 
         try {
             createUoW(exchange);
-            getAsyncProcessor().process(exchange, doneSync -> {
-                try {
-                    HttpServerResponse response = toHttpResponse(request, 
exchange.getMessage());
-                    Buffer body = null;
-
-                    if (request.response().getStatusCode() != 204) {
-                        body = computeResponseBody(exchange.getMessage());
-
-                        // set the content type in the response.
-                        String contentType = 
MessageHelper.getContentType(exchange.getMessage());
-                        if (contentType != null) {
-                            // set content-type
-                            response.putHeader(Exchange.CONTENT_TYPE, 
contentType);
-                        }
+
+            // We do not know if any of the processing logic of the route is 
synchronous or not so we
+            // need to process the request on a thread on the Vert.x worker 
pool.
+            //
+            // As example the following route may block the Vert.x event loop 
as the camel-http component
+            // is not async so if the service is scaled-down, the it may take 
a while to become ready and
+            // the camel-http component blocks until the service becomes 
available.
+            //
+            // from("knative:event/my.event")
+            //        
.to("http://{{env:PROJECT}}.{{env:NAMESPACE}}.svc.cluster.local/service";);
+            //
+            router.vertx().executeBlocking(
+                promise -> {
+                    try {
+                        // no need to use an async processor as the processing 
happen in
+                        // a dedicated thread ans it won't block the Vert.x 
event loop
+                        getProcessor().process(exchange);
+                        promise.complete();
+                    } catch (Exception e) {
+                        promise.fail(e);
                     }
+                },
+                false,
+                result -> {
+                    if (result.succeeded()) {
+                        try {
+                            HttpServerResponse response = 
toHttpResponse(request, exchange.getMessage());
+                            Buffer body = null;
+
+                            if (request.response().getStatusCode() != 204) {
+                                body = 
computeResponseBody(exchange.getMessage());
+
+                                // set the content type in the response.
+                                String contentType = 
MessageHelper.getContentType(exchange.getMessage());
+                                if (contentType != null) {
+                                    // set content-type
+                                    response.putHeader(Exchange.CONTENT_TYPE, 
contentType);
+                                }
+                            }
+
+                            if (body != null) {
+                                request.response().end(body);
+                            } else {
+                                request.response().setStatusCode(204);
+                                request.response().end();
+                            }
+                        } catch (Exception e) {
+                            getExceptionHandler().handleException(e);
+                        }
+                    } else if (result.failed()) {
+                        getExceptionHandler().handleException(result.cause());
 
-                    if (body != null) {
-                        request.response().end(body);
-                    } else {
-                        request.response().setStatusCode(204);
-                        request.response().end();
+                        request.response().setStatusCode(500);
+                        request.response().putHeader(Exchange.CONTENT_TYPE, 
"text/plain");
+                        request.response().end(result.cause().getMessage());
                     }
-                } catch (Exception e) {
-                    getExceptionHandler().handleException(e);
-                }
-            });
+
+                    doneUoW(exchange);
+                });
         } catch (Exception e) {
             getExceptionHandler().handleException(e);
 
             request.response().setStatusCode(500);
             request.response().putHeader(Exchange.CONTENT_TYPE, "text/plain");
             request.response().end(e.getMessage());
-        } finally {
+
             doneUoW(exchange);
         }
-
     }
 
     private Message toMessage(HttpServerRequest request, Exchange exchange) {
diff --git 
a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
 
b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
index a896c1c..ee55fcc 100644
--- 
a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
+++ 
b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
@@ -44,6 +44,7 @@ import 
org.apache.camel.http.base.HttpOperationFailedException;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.k.http.PlatformHttpServiceContextCustomizer;
 import org.apache.camel.k.test.AvailablePortFinder;
+import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -91,6 +92,8 @@ public class KnativeHttpTest {
 
     @AfterEach
     public void after() {
+        ServiceHelper.stopService(template);
+
         if (this.context != null) {
             this.context.stop();
         }
@@ -1680,5 +1683,66 @@ public class KnativeHttpTest {
             server.stop();
         }
     }
+
+    @ParameterizedTest
+    @EnumSource(CloudEvents.class)
+    void testSlowConsumer(CloudEvent ce) throws Exception {
+        final int port = AvailablePortFinder.getNextAvailable();
+        final KnativeHttpServer server = new KnativeHttpServer(context, port, 
event -> {
+            event.vertx().executeBlocking(
+                promise -> {
+                    try {
+                        Thread.sleep(5000);
+                        promise.complete();
+                    } catch (InterruptedException e) {
+                        promise.fail(e);
+                    }
+                },
+                false,
+                result -> {
+                    event.response().setStatusCode(200);
+                    event.response().end("");
+                }
+            );
+        });
+
+        configureKnativeComponent(
+            context,
+            ce,
+            sourceEndpoint(
+                "start",
+                mapOf(
+                    Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+                    Knative.CONTENT_TYPE, "text/plain"
+                ))
+        );
+
+        try {
+            server.start();
+
+            RouteBuilder.addRoutes(context, b -> {
+                b.from("knative:endpoint/start")
+                    .removeHeaders("Camel*")
+                    .toF("http://localhost:%d";, port);
+            });
+
+            context.start();
+
+            given()
+                .body("test")
+                .header(Exchange.CONTENT_TYPE, "text/plain")
+                
.header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), 
ce.version())
+                
.header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), 
"org.apache.camel.event")
+                
.header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), 
"myEventID")
+                
.header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), 
DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
+                
.header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), 
"/somewhere")
+                .when()
+                    .post()
+                .then()
+                    .statusCode(200);
+        } finally {
+            server.stop();
+        }
+    }
 }
 

Reply via email to