This is an automated email from the ASF dual-hosted git repository. lburgazzoli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git
commit 0f996a03f8a8904ec7dfc84f79aae4de68e641a2 Author: lburgazzoli <lburgazz...@gmail.com> AuthorDate: Thu Jul 9 12:14:40 2020 +0200 Event-loop blocked when using http component starting from knative #383 --- camel-knative/camel-knative-http/pom.xml | 4 ++ .../knative/http/KnativeHttpConsumer.java | 82 +++++++++++++++------- .../component/knative/http/KnativeHttpTest.java | 64 +++++++++++++++++ 3 files changed, 125 insertions(+), 25 deletions(-) diff --git a/camel-knative/camel-knative-http/pom.xml b/camel-knative/camel-knative-http/pom.xml index c6d2c85..3ee7d3e 100644 --- a/camel-knative/camel-knative-http/pom.xml +++ b/camel-knative/camel-knative-http/pom.xml @@ -43,6 +43,10 @@ <groupId>org.apache.camel</groupId> <artifactId>camel-cloud</artifactId> </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-http</artifactId> + </dependency> <dependency> <groupId>org.apache.camel.k</groupId> diff --git a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java index 4764e40..4ce46b1 100644 --- a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java +++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java @@ -163,42 +163,74 @@ public class KnativeHttpConsumer extends DefaultConsumer { try { createUoW(exchange); - getAsyncProcessor().process(exchange, doneSync -> { - try { - HttpServerResponse response = toHttpResponse(request, exchange.getMessage()); - Buffer body = null; - - if (request.response().getStatusCode() != 204) { - body = computeResponseBody(exchange.getMessage()); - - // set the content type in the response. - String contentType = MessageHelper.getContentType(exchange.getMessage()); - if (contentType != null) { - // set content-type - response.putHeader(Exchange.CONTENT_TYPE, contentType); - } + + // We do not know if any of the processing logic of the route is synchronous or not so we + // need to process the request on a thread on the Vert.x worker pool. + // + // As example the following route may block the Vert.x event loop as the camel-http component + // is not async so if the service is scaled-down, the it may take a while to become ready and + // the camel-http component blocks until the service becomes available. + // + // from("knative:event/my.event") + // .to("http://{{env:PROJECT}}.{{env:NAMESPACE}}.svc.cluster.local/service"); + // + router.vertx().executeBlocking( + promise -> { + try { + // no need to use an async processor as the processing happen in + // a dedicated thread ans it won't block the Vert.x event loop + getProcessor().process(exchange); + promise.complete(); + } catch (Exception e) { + promise.fail(e); } + }, + false, + result -> { + if (result.succeeded()) { + try { + HttpServerResponse response = toHttpResponse(request, exchange.getMessage()); + Buffer body = null; + + if (request.response().getStatusCode() != 204) { + body = computeResponseBody(exchange.getMessage()); + + // set the content type in the response. + String contentType = MessageHelper.getContentType(exchange.getMessage()); + if (contentType != null) { + // set content-type + response.putHeader(Exchange.CONTENT_TYPE, contentType); + } + } + + if (body != null) { + request.response().end(body); + } else { + request.response().setStatusCode(204); + request.response().end(); + } + } catch (Exception e) { + getExceptionHandler().handleException(e); + } + } else if (result.failed()) { + getExceptionHandler().handleException(result.cause()); - if (body != null) { - request.response().end(body); - } else { - request.response().setStatusCode(204); - request.response().end(); + request.response().setStatusCode(500); + request.response().putHeader(Exchange.CONTENT_TYPE, "text/plain"); + request.response().end(result.cause().getMessage()); } - } catch (Exception e) { - getExceptionHandler().handleException(e); - } - }); + + doneUoW(exchange); + }); } catch (Exception e) { getExceptionHandler().handleException(e); request.response().setStatusCode(500); request.response().putHeader(Exchange.CONTENT_TYPE, "text/plain"); request.response().end(e.getMessage()); - } finally { + doneUoW(exchange); } - } private Message toMessage(HttpServerRequest request, Exchange exchange) { diff --git a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java index a896c1c..ee55fcc 100644 --- a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java +++ b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java @@ -44,6 +44,7 @@ import org.apache.camel.http.base.HttpOperationFailedException; import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.k.http.PlatformHttpServiceContextCustomizer; import org.apache.camel.k.test.AvailablePortFinder; +import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.util.ObjectHelper; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -91,6 +92,8 @@ public class KnativeHttpTest { @AfterEach public void after() { + ServiceHelper.stopService(template); + if (this.context != null) { this.context.stop(); } @@ -1680,5 +1683,66 @@ public class KnativeHttpTest { server.stop(); } } + + @ParameterizedTest + @EnumSource(CloudEvents.class) + void testSlowConsumer(CloudEvent ce) throws Exception { + final int port = AvailablePortFinder.getNextAvailable(); + final KnativeHttpServer server = new KnativeHttpServer(context, port, event -> { + event.vertx().executeBlocking( + promise -> { + try { + Thread.sleep(5000); + promise.complete(); + } catch (InterruptedException e) { + promise.fail(e); + } + }, + false, + result -> { + event.response().setStatusCode(200); + event.response().end(""); + } + ); + }); + + configureKnativeComponent( + context, + ce, + sourceEndpoint( + "start", + mapOf( + Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.CONTENT_TYPE, "text/plain" + )) + ); + + try { + server.start(); + + RouteBuilder.addRoutes(context, b -> { + b.from("knative:endpoint/start") + .removeHeaders("Camel*") + .toF("http://localhost:%d", port); + }); + + context.start(); + + given() + .body("test") + .header(Exchange.CONTENT_TYPE, "text/plain") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()) + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID") + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())) + .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere") + .when() + .post() + .then() + .statusCode(200); + } finally { + server.stop(); + } + } }