This is an automated email from the ASF dual-hosted git repository. jamesnetherton pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new eefdd528b80 CAMEL-19905: Add streaming option to camel-platform-http-vertx eefdd528b80 is described below commit eefdd528b80ec28da81a3b4401704b3dfbdf5d00 Author: James Netherton <jamesnether...@gmail.com> AuthorDate: Thu Nov 9 13:39:12 2023 +0000 CAMEL-19905: Add streaming option to camel-platform-http-vertx --- .../camel/catalog/components/platform-http.json | 13 +- .../src/main/docs/platform-http-vertx.adoc | 12 + .../platform/http/vertx/AsyncInputStream.java | 243 +++++++++++++++++++ .../http/vertx/DefaultHttpRequestBodyHandler.java | 50 ++++ .../http/vertx/HttpRequestBodyHandler.java | 50 ++++ .../vertx/StreamingHttpRequestBodyHandler.java | 93 ++++++++ .../http/vertx/VertxPlatformHttpConsumer.java | 142 ++++++----- .../http/vertx/VertxPlatformHttpSupport.java | 89 +++++-- .../http/vertx/VertxPlatformHttpEngineTest.java | 2 +- ...VertxPlatformHttpLargeMessageStreamingTest.java | 94 ++++++++ .../http/vertx/VertxPlatformHttpProxyTest.java | 10 +- .../http/vertx/VertxPlatformHttpStreamingTest.java | 264 +++++++++++++++++++++ .../http/PlatformHttpEndpointConfigurer.java | 6 + .../http/PlatformHttpEndpointUriFactory.java | 3 +- .../component/platform/http/platform-http.json | 13 +- .../platform/http/PlatformHttpEndpoint.java | 11 + .../dsl/PlatformHttpEndpointBuilderFactory.java | 31 +++ 17 files changed, 1009 insertions(+), 117 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/platform-http.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/platform-http.json index bb67bb10512..69c9b292526 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/platform-http.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/platform-http.json @@ -33,11 +33,12 @@ "matchOnUriPrefix": { "index": 3, "kind": "parameter", "displayName": "Match On Uri Prefix", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether or not the consumer should try to find a target consumer by matching the URI prefix if no exact match is found." }, "muteException": { "index": 4, "kind": "parameter", "displayName": "Mute Exception", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "If enabled and an Exchange failed processing on the consumer side the response's body won't contain the exception's stack trace." }, "produces": { "index": 5, "kind": "parameter", "displayName": "Produces", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "The content type this endpoint produces, such as application\/xml or application\/json." }, - "bridgeErrorHandler": { "index": 6, "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming [...] - "exceptionHandler": { "index": 7, "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By def [...] - "exchangePattern": { "index": 8, "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." }, - "fileNameExtWhitelist": { "index": 9, "kind": "parameter", "displayName": "File Name Ext Whitelist", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "A comma or whitespace separated list of file extensions. Uploads having these extensions will be stored locally. Null value or asterisk () will allow all files." }, - "headerFilterStrategy": { "index": 10, "kind": "parameter", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom HeaderFilterStrategy to filter headers to and from Camel message." }, - "platformHttpEngine": { "index": 11, "kind": "parameter", "displayName": "Platform Http Engine", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.platform.http.spi.PlatformHttpEngine", "deprecated": false, "autowired": false, "secret": false, "description": "An HTTP Server engine implementation to serve the requests of this endpoint." } + "useStreaming": { "index": 6, "kind": "parameter", "displayName": "Use Streaming", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether to use streaming for large requests and responses" }, + "bridgeErrorHandler": { "index": 7, "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming [...] + "exceptionHandler": { "index": 8, "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By def [...] + "exchangePattern": { "index": 9, "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." }, + "fileNameExtWhitelist": { "index": 10, "kind": "parameter", "displayName": "File Name Ext Whitelist", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "A comma or whitespace separated list of file extensions. Uploads having these extensions will be stored locally. Null value or asterisk () will allow all files." }, + "headerFilterStrategy": { "index": 11, "kind": "parameter", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom HeaderFilterStrategy to filter headers to and from Camel message." }, + "platformHttpEngine": { "index": 12, "kind": "parameter", "displayName": "Platform Http Engine", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.platform.http.spi.PlatformHttpEngine", "deprecated": false, "autowired": false, "secret": false, "description": "An HTTP Server engine implementation to serve the requests of this endpoint." } } } diff --git a/components/camel-platform-http-vertx/src/main/docs/platform-http-vertx.adoc b/components/camel-platform-http-vertx/src/main/docs/platform-http-vertx.adoc index 118225964ff..176d5bef976 100644 --- a/components/camel-platform-http-vertx/src/main/docs/platform-http-vertx.adoc +++ b/components/camel-platform-http-vertx/src/main/docs/platform-http-vertx.adoc @@ -96,3 +96,15 @@ Camel `HttpMessage` as shown in the custom `Processor` below : }); ---- +== Handling large request / response payloads + +When large request / response payloads are expected, there is a `useStreaming` option, which can be enabled to improve performance. +When `useStreaming` is `true`, it will take advantage of xref:manual::stream-caching.adoc[stream caching]. In conjunction with enabling disk spooling, you can avoid having to store the entire request body payload in memory. + +[source,java] +---- +// Handle a large request body and stream it to a file +from("platform-http:/upload?httpMethodRestrict=POST&useStreaming=true") + .log("Processing large request body...") + .to("file:/uploads?fileName=uploaded.txt") +---- diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/AsyncInputStream.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/AsyncInputStream.java new file mode 100644 index 00000000000..813dbbe6a85 --- /dev/null +++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/AsyncInputStream.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.platform.http.vertx; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; + +import io.vertx.core.AsyncResult; +import io.vertx.core.Context; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.streams.ReadStream; +import io.vertx.core.streams.impl.InboundBuffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link ReadStream} that can process an {@link InputStream} in an asynchronous way, so that the content can be pumped + * to the {@link io.vertx.core.streams.WriteStream} of an {@link io.vertx.core.http.HttpServerResponse}. + */ +public class AsyncInputStream implements ReadStream<Buffer> { + private static final Logger LOG = LoggerFactory.getLogger(AsyncInputStream.class); + private static final int DEFAULT_BUFFER_SIZE = 4096; + + private final ReadableByteChannel channel; + private final Vertx vertx; + private final Context context; + private final InboundBuffer<Buffer> queue; + private long readPos; + private boolean closed; + private boolean readInProgress; + private Handler<Buffer> dataHandler; + private Handler<Void> endHandler; + private Handler<Throwable> exceptionHandler; + + public AsyncInputStream(Vertx vertx, Context context, InputStream inputStream) { + this.vertx = vertx; + this.context = context; + this.channel = Channels.newChannel(inputStream); + this.queue = new InboundBuffer<>(context, 0); + queue.handler(buffer -> { + if (buffer.length() > 0) { + handleData(buffer); + } else { + handleEnd(); + } + }); + queue.drainHandler(v -> doRead()); + } + + @Override + public synchronized AsyncInputStream endHandler(Handler<Void> endHandler) { + checkStreamClosed(); + this.endHandler = endHandler; + return this; + } + + @Override + public synchronized AsyncInputStream exceptionHandler(Handler<Throwable> exceptionHandler) { + checkStreamClosed(); + this.exceptionHandler = exceptionHandler; + return this; + } + + @Override + public synchronized AsyncInputStream handler(Handler<Buffer> handler) { + checkStreamClosed(); + this.dataHandler = handler; + if (this.dataHandler != null && !this.closed) { + this.doRead(); + } else { + queue.clear(); + } + return this; + } + + @Override + public synchronized AsyncInputStream pause() { + checkStreamClosed(); + queue.pause(); + return this; + } + + @Override + public synchronized AsyncInputStream resume() { + checkStreamClosed(); + queue.resume(); + return this; + } + + @Override + public ReadStream<Buffer> fetch(long amount) { + checkStreamClosed(); + queue.fetch(amount); + return this; + } + + public void close(Handler<AsyncResult<Void>> handler) { + closeInternal(handler); + } + + private void checkStreamClosed() { + if (this.closed) { + throw new IllegalStateException("Stream closed"); + } + } + + private void checkContext() { + Context contextToCheck = vertx.getOrCreateContext(); + if (!contextToCheck.equals(context)) { + throw new IllegalStateException( + "AsyncInputStream must only be used in the context that created it, expected: " + this.context + + " actual " + contextToCheck); + } + } + + private synchronized void closeInternal(Handler<AsyncResult<Void>> handler) { + closed = true; + doClose(handler); + } + + private void doClose(Handler<AsyncResult<Void>> handler) { + try { + channel.close(); + if (handler != null) { + this.vertx.runOnContext(v -> handler.handle(Future.succeededFuture())); + } + } catch (IOException e) { + if (handler != null) { + this.vertx.runOnContext(v -> handler.handle(Future.failedFuture(e))); + } + } + } + + private void doRead() { + checkStreamClosed(); + doRead(ByteBuffer.allocate(DEFAULT_BUFFER_SIZE)); + } + + private synchronized void doRead(ByteBuffer buffer) { + if (!readInProgress) { + readInProgress = true; + Buffer buff = Buffer.buffer(DEFAULT_BUFFER_SIZE); + doRead(buff, 0, buffer, readPos, result -> { + if (result.succeeded()) { + readInProgress = false; + Buffer updatedBuffer = result.result(); + readPos += updatedBuffer.length(); + if (queue.write(updatedBuffer) && updatedBuffer.length() > 0) { + doRead(buffer); + } + } else { + handleException(result.cause()); + } + }); + } + } + + private void doRead(Buffer writeBuff, int offset, ByteBuffer buffer, long position, Handler<AsyncResult<Buffer>> handler) { + vertx.executeBlocking(() -> channel.read(buffer)) + .onComplete(result -> { + if (result.succeeded()) { + Integer bytesRead = result.result(); + if (bytesRead == -1) { + // EOF + context.runOnContext((v) -> { + buffer.flip(); + writeBuff.setBytes(offset, buffer); + buffer.compact(); + handler.handle(Future.succeededFuture(writeBuff)); + }); + } else if (buffer.hasRemaining()) { + // Read from the next offset + context.runOnContext((v) -> { + doRead(writeBuff, offset, buffer, position + bytesRead, handler); + }); + } else { + // All data is written + context.runOnContext((v) -> { + buffer.flip(); + writeBuff.setBytes(offset, buffer); + buffer.compact(); + handler.handle(Future.succeededFuture(writeBuff)); + }); + } + } else { + context.runOnContext((v) -> handler.handle(Future.failedFuture(result.cause()))); + } + }); + } + + private void handleData(Buffer buffer) { + Handler<Buffer> handler; + synchronized (this) { + handler = this.dataHandler; + } + if (handler != null) { + checkContext(); + handler.handle(buffer); + } + } + + private synchronized void handleEnd() { + Handler<Void> endHandler; + synchronized (this) { + dataHandler = null; + endHandler = this.endHandler; + } + if (endHandler != null) { + checkContext(); + endHandler.handle(null); + } + } + + private void handleException(Throwable t) { + if (exceptionHandler != null && t instanceof Exception) { + exceptionHandler.handle(t); + } else { + if (LOG.isErrorEnabled()) { + LOG.error("Unhandled error while processing stream", t); + } + } + } +} diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/DefaultHttpRequestBodyHandler.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/DefaultHttpRequestBodyHandler.java new file mode 100644 index 00000000000..4f187da7f0a --- /dev/null +++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/DefaultHttpRequestBodyHandler.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.platform.http.vertx; + +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.ext.web.RequestBody; +import io.vertx.ext.web.Route; +import io.vertx.ext.web.RoutingContext; +import org.apache.camel.Message; + +import static org.apache.camel.component.platform.http.vertx.VertxPlatformHttpSupport.isFormUrlEncoded; +import static org.apache.camel.component.platform.http.vertx.VertxPlatformHttpSupport.isMultiPartFormData; + +/** + * Default {@link HttpRequestBodyHandler} that will read to read the entire HTTP request body into memory. + */ +class DefaultHttpRequestBodyHandler extends HttpRequestBodyHandler { + DefaultHttpRequestBodyHandler(Handler<RoutingContext> delegate) { + super(delegate); + } + + @Override + void configureRoute(Route route) { + route.handler(delegate); + } + + @Override + Future<Void> handle(RoutingContext routingContext, Message message) { + if (!isMultiPartFormData(routingContext) && !isFormUrlEncoded(routingContext)) { + final RequestBody requestBody = routingContext.body(); + message.setBody(requestBody.buffer()); + } + return Future.succeededFuture(); + } +} diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/HttpRequestBodyHandler.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/HttpRequestBodyHandler.java new file mode 100644 index 00000000000..e537612a518 --- /dev/null +++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/HttpRequestBodyHandler.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.platform.http.vertx; + +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.ext.web.Route; +import io.vertx.ext.web.RoutingContext; +import org.apache.camel.Message; + +/** + * Abstraction to handle HTTP request body processing. + */ +abstract class HttpRequestBodyHandler { + Handler<RoutingContext> delegate; + + HttpRequestBodyHandler(Handler<RoutingContext> delegate) { + this.delegate = delegate; + } + + /** + * Performs any required configuration on a {@link Route}. + * + * @param route The route to configure + */ + abstract void configureRoute(Route route); + + /** + * Processes the incoming HTTP request body. + * + * @param routingContext The {@link RoutingContext} for the HTTP request being processed + * @param message The {@link Message} associated with the HTTP request being processed + * @return {@link Future} to determine when the HTTP request body has been fully processed + */ + abstract Future<Void> handle(RoutingContext routingContext, Message message); +} diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/StreamingHttpRequestBodyHandler.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/StreamingHttpRequestBodyHandler.java new file mode 100644 index 00000000000..88531a6a07e --- /dev/null +++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/StreamingHttpRequestBodyHandler.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.platform.http.vertx; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; + +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.ext.web.Route; +import io.vertx.ext.web.RoutingContext; +import org.apache.camel.Message; +import org.apache.camel.converter.stream.CachedOutputStream; + +import static org.apache.camel.component.platform.http.vertx.VertxPlatformHttpSupport.isFormUrlEncoded; +import static org.apache.camel.component.platform.http.vertx.VertxPlatformHttpSupport.isMultiPartFormData; + +/** + * A {@link HttpRequestBodyHandler} that can handle large request bodies via {@link CachedOutputStream}. + */ +class StreamingHttpRequestBodyHandler extends HttpRequestBodyHandler { + StreamingHttpRequestBodyHandler(Handler<RoutingContext> delegate) { + super(delegate); + } + + @Override + void configureRoute(Route route) { + // No configuration necessary for streaming + } + + @Override + Future<Void> handle(RoutingContext routingContext, Message message) { + // Reject multipart requests if streaming enabled as we can't be sure when Vert.x has + // fully written the attachments to disk after invoking the default body handler. + if (isMultiPartFormData(routingContext)) { + return Future.failedFuture( + new IllegalStateException("Cannot process multipart/form-data requests when useStreaming=true")); + } + + Promise<Void> promise = Promise.promise(); + HttpServerRequest request = routingContext.request(); + if (isFormUrlEncoded(routingContext)) { + // Delegate body handling to the default body handler + delegate.handle(routingContext); + request.endHandler(promise::complete); + } else { + // Process each body 'chunk' and write it to CachedOutputStream + CachedOutputStream stream = new CachedOutputStream(message.getExchange(), true); + AtomicReference<Exception> failureCause = new AtomicReference<>(); + request.handler(buffer -> { + try { + stream.write(buffer.getBytes()); + } catch (IOException e) { + failureCause.set(e); + } + }); + // After the body is read, close the CachedOutputStream and get an InputStream to use as the message body + request.endHandler(event -> { + try { + stream.close(); + + Exception failure = failureCause.get(); + if (failure == null) { + message.setBody(stream.getInputStream()); + promise.complete(); + } else { + promise.fail(failure); + } + } catch (IOException e) { + promise.fail(e); + } + }); + } + + return promise.future(); + } +} diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java index d948d5c833a..915c1926110 100644 --- a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java +++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java @@ -26,15 +26,13 @@ import java.util.regex.Pattern; import jakarta.activation.DataHandler; -import io.vertx.core.AsyncResult; +import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.MultiMap; import io.vertx.core.Vertx; -import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpMethod; import io.vertx.ext.auth.User; import io.vertx.ext.web.FileUpload; -import io.vertx.ext.web.RequestBody; import io.vertx.ext.web.Route; import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.impl.RouteImpl; @@ -56,6 +54,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.apache.camel.component.platform.http.vertx.VertxPlatformHttpSupport.appendHeader; +import static org.apache.camel.component.platform.http.vertx.VertxPlatformHttpSupport.isFormUrlEncoded; +import static org.apache.camel.component.platform.http.vertx.VertxPlatformHttpSupport.isMultiPartFormData; import static org.apache.camel.component.platform.http.vertx.VertxPlatformHttpSupport.populateCamelHeaders; import static org.apache.camel.component.platform.http.vertx.VertxPlatformHttpSupport.writeResponse; @@ -74,6 +74,7 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer implements Suspen private String path; private Route route; private VertxPlatformHttpRouter router; + private HttpRequestBodyHandler httpRequestBodyHandler; public VertxPlatformHttpConsumer(PlatformHttpEndpoint endpoint, Processor processor, @@ -97,6 +98,11 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer implements Suspen methods = Method.parseList(getEndpoint().getHttpMethodRestrict()); path = configureEndpointPath(getEndpoint()); router = VertxPlatformHttpRouter.lookup(getEndpoint().getCamelContext()); + if (!getEndpoint().isHttpProxy() && getEndpoint().isUseStreaming()) { + httpRequestBodyHandler = new StreamingHttpRequestBodyHandler(router.bodyHandler()); + } else { + httpRequestBodyHandler = new DefaultHttpRequestBodyHandler(router.bodyHandler()); + } } @Override @@ -126,7 +132,7 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer implements Suspen } } - newRoute.handler(router.bodyHandler()); + httpRequestBodyHandler.configureRoute(newRoute); for (Handler<RoutingContext> handler : handlers) { newRoute.handler(handler); } @@ -161,7 +167,8 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer implements Suspen } final Vertx vertx = ctx.vertx(); - final Exchange exchange = toExchange(ctx); + final Exchange exchange = createExchange(false); + exchange.setPattern(ExchangePattern.InOut); // // We do not know if any of the processing logic of the route is synchronous or not so we @@ -180,45 +187,51 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer implements Suspen // .to("rest:get:?bridgeEndpoint=true"); // - if (getEndpoint().isHttpProxy()) { - handleProxy(ctx, exchange); - } + // Note: any logic that needs to interrogate HTTP headers not provided by RoutingContext.parsedHeaders, should + // be done inside of the following onComplete block, to ensure that the HTTP request is fully processed. + processHttpRequest(exchange, ctx).onComplete(result -> { + if (result.failed()) { + handleFailure(exchange, ctx, result.cause()); + return; + } - vertx.executeBlocking(() -> processRequest(exchange), false) - .onComplete(result -> processResult(ctx, result, exchange)); - } + if (getEndpoint().isHttpProxy()) { + handleProxy(ctx, exchange); + } - private void processResult( - RoutingContext ctx, AsyncResult<Object> result, Exchange exchange) { - Throwable failure = null; - try { - if (result.succeeded()) { - try { - writeResponse(ctx, exchange, getEndpoint().getHeaderFilterStrategy(), muteExceptions); - } catch (Exception e) { - failure = e; + populateMultiFormData(ctx, exchange.getIn(), getEndpoint().getHeaderFilterStrategy()); + + vertx.executeBlocking(() -> processExchange(exchange), false).onComplete(processExchangeResult -> { + if (processExchangeResult.succeeded()) { + writeResponse(ctx, exchange, getEndpoint().getHeaderFilterStrategy(), muteExceptions) + .onComplete(writeResponseResult -> { + if (writeResponseResult.succeeded()) { + handleExchangeComplete(exchange); + } else { + handleFailure(exchange, ctx, writeResponseResult.cause()); + } + }); + } else { + handleFailure(exchange, ctx, processExchangeResult.cause()); } - } else { - failure = result.cause(); - } + }); + }); + } - if (failure != null) { - handleFailure(ctx, failure); - } - } finally { - doneUoW(exchange); - releaseExchange(exchange, false); - } + private void handleExchangeComplete(Exchange exchange) { + doneUoW(exchange); + releaseExchange(exchange, false); } - private void handleFailure(RoutingContext ctx, Throwable failure) { + private void handleFailure(Exchange exchange, RoutingContext ctx, Throwable failure) { getExceptionHandler().handleException( "Failed handling platform-http endpoint " + getEndpoint().getPath(), failure); ctx.fail(failure); + handleExchangeComplete(exchange); } - private Object processRequest(Exchange exchange) throws Exception { + private Object processExchange(Exchange exchange) throws Exception { createUoW(exchange); getProcessor().process(exchange); return null; @@ -236,10 +249,7 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer implements Suspen exchange.getMessage().removeHeader("Proxy-Connection"); } - protected Exchange toExchange(RoutingContext ctx) { - final Exchange exchange = createExchange(false); - exchange.setPattern(ExchangePattern.InOut); - + protected Future<Void> processHttpRequest(Exchange exchange, RoutingContext ctx) { // reuse existing http message if pooled Message in = exchange.getIn(); if (in instanceof HttpMessage hm) { @@ -248,7 +258,6 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer implements Suspen in = new HttpMessage(exchange, ctx.request(), ctx.response()); exchange.setMessage(in); } - populateCamelMessage(ctx, exchange, in); final String charset = ctx.parsedHeaders().contentType().parameter("charset"); if (charset != null) { @@ -261,53 +270,38 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer implements Suspen in.setHeader(VertxPlatformHttpConstants.AUTHENTICATED_USER, user); } - return exchange; + return populateCamelMessage(ctx, exchange, in); } - protected void populateCamelMessage(RoutingContext ctx, Exchange exchange, Message result) { + protected Future<Void> populateCamelMessage(RoutingContext ctx, Exchange exchange, Message message) { final HeaderFilterStrategy headerFilterStrategy = getEndpoint().getHeaderFilterStrategy(); - populateCamelHeaders(ctx, result.getHeaders(), exchange, headerFilterStrategy); - final String mimeType = ctx.parsedHeaders().contentType().value(); - final boolean isMultipartFormData = "multipart/form-data".equals(mimeType); - - if ("application/x-www-form-urlencoded".equals(mimeType) || isMultipartFormData) { - populateMultiFormData(ctx, exchange, result, headerFilterStrategy, isMultipartFormData); - } else { - populateDefaultMessage(ctx, result); - } - } - - private static void populateDefaultMessage(RoutingContext ctx, Message result) { - final RequestBody requestBody = ctx.body(); - final Buffer body = requestBody.buffer(); - if (body != null) { - result.setBody(body); - } else { - result.setBody(null); - } + populateCamelHeaders(ctx, message.getHeaders(), exchange, headerFilterStrategy); + return httpRequestBodyHandler.handle(ctx, message); } private void populateMultiFormData( - RoutingContext ctx, Exchange exchange, Message result, HeaderFilterStrategy headerFilterStrategy, - boolean isMultipartFormData) { - final MultiMap formData = ctx.request().formAttributes(); - final Map<String, Object> body = new HashMap<>(); - for (String key : formData.names()) { - for (String value : formData.getAll(key)) { - if (headerFilterStrategy != null - && !headerFilterStrategy.applyFilterToExternalHeaders(key, value, exchange)) { - appendHeader(result.getHeaders(), key, value); - appendHeader(body, key, value); + RoutingContext ctx, Message message, HeaderFilterStrategy headerFilterStrategy) { + final boolean isMultipartFormData = isMultiPartFormData(ctx); + if (isFormUrlEncoded(ctx) || isMultipartFormData) { + final MultiMap formData = ctx.request().formAttributes(); + final Map<String, Object> body = new HashMap<>(); + for (String key : formData.names()) { + for (String value : formData.getAll(key)) { + if (headerFilterStrategy != null + && !headerFilterStrategy.applyFilterToExternalHeaders(key, value, message.getExchange())) { + appendHeader(message.getHeaders(), key, value); + appendHeader(body, key, value); + } } } - } - if (!body.isEmpty()) { - result.setBody(body); - } + if (!body.isEmpty()) { + message.setBody(body); + } - if (isMultipartFormData) { - populateAttachments(ctx.fileUploads(), result); + if (isMultipartFormData) { + populateAttachments(ctx.fileUploads(), message); + } } } diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java index 73e3e22641c..19b14a5fc47 100644 --- a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java +++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java @@ -27,11 +27,17 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import io.netty.handler.codec.http.HttpHeaderValues; +import io.vertx.core.Context; +import io.vertx.core.Future; import io.vertx.core.MultiMap; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpServerRequest; import io.vertx.core.http.HttpServerResponse; import io.vertx.core.net.SocketAddress; +import io.vertx.core.streams.Pump; import io.vertx.ext.web.RoutingContext; import org.apache.camel.Exchange; import org.apache.camel.Message; @@ -189,57 +195,72 @@ public final class VertxPlatformHttpSupport { return codeToUse; } - static void writeResponse( - RoutingContext ctx, Exchange camelExchange, HeaderFilterStrategy headerFilterStrategy, boolean muteExceptions) - throws Exception { + static Future<Void> writeResponse( + RoutingContext ctx, Exchange camelExchange, HeaderFilterStrategy headerFilterStrategy, boolean muteExceptions) { final Object body = toHttpResponse(ctx.response(), camelExchange.getMessage(), headerFilterStrategy, muteExceptions); - final HttpServerResponse response = ctx.response(); + final Promise<Void> promise = Promise.promise(); if (body == null) { LOGGER.trace("No payload to send as reply for exchange: {}", camelExchange); - response.end(); + ctx.end(); + promise.complete(); } else if (body instanceof String) { - response.end((String) body); + ctx.end((String) body); + promise.complete(); } else if (body instanceof InputStream) { - writeResponseAs(response, (InputStream) body); + writeResponseAs(promise, ctx, (InputStream) body); } else if (body instanceof Buffer) { - response.end((Buffer) body); + ctx.end((Buffer) body); + promise.complete(); } else { - writeResponseAsFallback(camelExchange, body, response); + try { + writeResponseAsFallback(promise, camelExchange, body, ctx); + } catch (IOException | NoTypeConversionAvailableException e) { + promise.fail(e); + } } + + return promise.future(); } - private static void writeResponseAsFallback(Exchange camelExchange, Object body, HttpServerResponse response) + private static void writeResponseAsFallback(Promise<Void> promise, Exchange camelExchange, Object body, RoutingContext ctx) throws NoTypeConversionAvailableException, IOException { final TypeConverter tc = camelExchange.getContext().getTypeConverter(); // Try to convert to ByteBuffer for performance reason final ByteBuffer bb = tc.tryConvertTo(ByteBuffer.class, camelExchange, body); if (bb != null) { - writeResponseAs(response, bb); + writeResponseAs(promise, ctx, bb); } else { // Otherwise fallback to most generic InputStream conversion final InputStream is = tc.mandatoryConvertTo(InputStream.class, camelExchange, body); - writeResponseAs(response, is); + writeResponseAs(promise, ctx, is); } } - private static void writeResponseAs(HttpServerResponse response, ByteBuffer bb) { + private static void writeResponseAs(Promise<Void> promise, RoutingContext ctx, ByteBuffer bb) { final Buffer b = Buffer.buffer(bb.capacity()); b.setBytes(0, bb); - response.end(b); + ctx.end(b); + promise.complete(); } - private static void writeResponseAs(HttpServerResponse response, InputStream is) throws IOException { - final byte[] bytes = new byte[4096]; - try (InputStream in = is) { - int len; - while ((len = in.read(bytes)) >= 0) { - final Buffer b = Buffer.buffer(len); - b.appendBytes(bytes, 0, len); - response.write(b); - } - } - response.end(); + private static void writeResponseAs(Promise<Void> promise, RoutingContext ctx, InputStream is) { + HttpServerResponse response = ctx.response(); + Vertx vertx = ctx.vertx(); + Context context = vertx.getOrCreateContext(); + + // Process the InputStream async to avoid blocking the Vert.x event loop on large responses + AsyncInputStream asyncInputStream = new AsyncInputStream(vertx, context, is); + asyncInputStream.exceptionHandler(promise::fail); + asyncInputStream.endHandler(event -> { + response.end().onComplete(result -> { + asyncInputStream.close(closeResult -> promise.complete()); + }); + }); + + // Pump the InputStream content into the HTTP response WriteStream + Pump pump = Pump.pump(asyncInputStream, response); + context.runOnContext(event -> pump.start()); } static void populateCamelHeaders( @@ -336,4 +357,22 @@ public final class VertxPlatformHttpSupport { value = list; return value; } + + static boolean isMultiPartFormData(RoutingContext ctx) { + return isContentTypeMatching(ctx, HttpHeaderValues.MULTIPART_FORM_DATA.toString()); + } + + static boolean isFormUrlEncoded(RoutingContext ctx) { + return isContentTypeMatching(ctx, HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED.toString()); + } + + private static boolean isContentTypeMatching(RoutingContext ctx, String expectedContentType) { + String contentType = ctx.parsedHeaders().contentType().value(); + boolean match = false; + if (org.apache.camel.util.ObjectHelper.isNotEmpty(contentType)) { + String lowerCaseContentType = contentType.toLowerCase(); + match = lowerCaseContentType.startsWith(expectedContentType); + } + return match; + } } diff --git a/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpEngineTest.java b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpEngineTest.java index 633826ad959..8b5b5642ebf 100644 --- a/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpEngineTest.java +++ b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpEngineTest.java @@ -880,7 +880,7 @@ public class VertxPlatformHttpEngineTest { return createCamelContext(null); } - private static CamelContext createCamelContext(ServerConfigurationCustomizer customizer) throws Exception { + static CamelContext createCamelContext(ServerConfigurationCustomizer customizer) throws Exception { int port = AvailablePortFinder.getNextAvailable(); VertxPlatformHttpServerConfiguration conf = new VertxPlatformHttpServerConfiguration(); conf.setBindPort(port); diff --git a/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpLargeMessageStreamingTest.java b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpLargeMessageStreamingTest.java new file mode 100644 index 00000000000..7c3a118a30e --- /dev/null +++ b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpLargeMessageStreamingTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.platform.http.vertx; + +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Random; + +import org.apache.camel.CamelContext; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.util.IOHelper; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIfSystemProperty; + +import static io.restassured.RestAssured.given; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@EnabledIfSystemProperty(named = "performance-tests", matches = ".*") +public class VertxPlatformHttpLargeMessageStreamingTest { + + @Test + void testStreamingWithLargeRequestAndResponseBody() throws Exception { + final CamelContext context = VertxPlatformHttpEngineTest.createCamelContext(); + context.getStreamCachingStrategy().setSpoolEnabled(true); + + Path input = createLargeFile(); + Path output = Files.createTempFile("platform-http-output", "dat"); + + try { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() { + from("platform-http:/streaming?useStreaming=true") + .log("Done echoing back request body as response body"); + } + }); + + context.start(); + + InputStream response = given() + .body(new FileInputStream(input.toFile())) + .post("/streaming") + .then() + .extract() + .asInputStream(); + + try (FileOutputStream fos = new FileOutputStream(output.toFile())) { + IOHelper.copy(response, fos); + } + + assertEquals(input.toFile().length(), output.toFile().length()); + } finally { + context.stop(); + Files.deleteIfExists(input); + Files.deleteIfExists(output); + } + } + + private Path createLargeFile() throws IOException { + // Create a 4GB file containing random data + Path path = Files.createTempFile("platform-http-input", "dat"); + try (FileOutputStream fos = new FileOutputStream(path.toFile())) { + Random random = new Random(); + long targetFileSize = (long) (4 * Math.pow(1024, 3)); + long bytesWritten = 0L; + + byte[] data = new byte[1024]; + while (bytesWritten < targetFileSize) { + random.nextBytes(data); + fos.write(data); + bytesWritten += data.length; + } + } + return path; + } +} diff --git a/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpProxyTest.java b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpProxyTest.java index df2a47cdc9d..d940aef8ac3 100644 --- a/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpProxyTest.java +++ b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpProxyTest.java @@ -25,7 +25,8 @@ import org.apache.camel.builder.RouteBuilder; import org.apache.camel.test.AvailablePortFinder; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; import static com.github.tomakehurst.wiremock.client.WireMock.get; @@ -55,15 +56,16 @@ public class VertxPlatformHttpProxyTest { } } - @Test - void testProxy() throws Exception { + @ParameterizedTest + @ValueSource(booleans = { false, true }) + void testProxy(boolean useStreaming) throws Exception { final CamelContext context = VertxPlatformHttpEngineTest.createCamelContext(); try { context.addRoutes(new RouteBuilder() { @Override public void configure() { - from("platform-http:proxy") + from("platform-http:proxy?useStreaming=" + useStreaming) .toD("${headers." + Exchange.HTTP_URI + "}?bridgeEndpoint=true"); } }); diff --git a/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpStreamingTest.java b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpStreamingTest.java new file mode 100644 index 00000000000..786047cbda6 --- /dev/null +++ b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpStreamingTest.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.platform.http.vertx; + +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; + +import io.restassured.http.ContentType; +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.util.IOHelper; +import org.junit.jupiter.api.Test; + +import static io.restassured.RestAssured.given; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class VertxPlatformHttpStreamingTest { + + @Test + void testStreamingWithStringRequestAndResponseBody() throws Exception { + final CamelContext context = VertxPlatformHttpEngineTest.createCamelContext(); + + try { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() { + from("platform-http:/streaming?useStreaming=true") + .transform().simple("Hello ${body}"); + } + }); + + context.start(); + + String requestBody = "Vert.x Platform HTTP"; + given() + .body(requestBody) + .post("/streaming") + .then() + .statusCode(200) + .body(is("Hello " + requestBody)); + } finally { + context.stop(); + } + } + + @Test + void testStreamingWithFileRequestAndResponseBody() throws Exception { + final CamelContext context = VertxPlatformHttpEngineTest.createCamelContext(); + String content = "Hello World"; + Path testFile = Files.createTempFile("platform-http-testing", "txt"); + Files.writeString(testFile, content); + + try { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() { + from("platform-http:/streaming?useStreaming=true") + .log("Done processing request"); + } + }); + + context.start(); + + given() + .body(testFile.toFile()) + .post("/streaming") + .then() + .statusCode(200) + .body(is(content)); + } finally { + context.stop(); + Files.deleteIfExists(testFile); + } + } + + @Test + void testStreamingWithFormUrlEncodedBody() throws Exception { + final CamelContext context = VertxPlatformHttpEngineTest.createCamelContext(); + try { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() { + from("platform-http:/streaming?useStreaming=true") + .setBody().simple("foo = ${header.foo}"); + } + }); + + context.start(); + + given() + .contentType(ContentType.URLENC) + .formParam("foo", "bar") + .post("/streaming") + .then() + .statusCode(200) + .body(is("foo = bar")); + } finally { + context.stop(); + } + } + + @Test + void testStreamingWithMultiPartRequestRejected() throws Exception { + String content = "Hello World"; + Path testFile = Files.createTempFile("platform-http-testing", "txt"); + Files.writeString(testFile, content); + + final CamelContext context = VertxPlatformHttpEngineTest.createCamelContext(configuration -> { + VertxPlatformHttpServerConfiguration.BodyHandler bodyHandler + = new VertxPlatformHttpServerConfiguration.BodyHandler(); + // turn on file uploads + bodyHandler.setHandleFileUploads(true); + bodyHandler.setUploadsDirectory(testFile.toFile().getParent()); + configuration.setBodyHandler(bodyHandler); + }); + + try { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() { + from("platform-http:/streaming?useStreaming=true") + .setBody().constant("multipart request should have been rejected"); + } + }); + + context.start(); + + given() + .multiPart(testFile.toFile()) + .post("/streaming") + .then() + .statusCode(500); + } finally { + context.stop(); + Files.deleteIfExists(testFile); + } + } + + @Test + void testStreamingWithSpecificEncoding() throws Exception { + final CamelContext context = VertxPlatformHttpEngineTest.createCamelContext(); + Path input = Files.createTempFile("platform-http-input", "dat"); + Path output = Files.createTempFile("platform-http-output", "dat"); + + String fileContent = "Content with special character ð"; + Files.writeString(input, fileContent, StandardCharsets.ISO_8859_1); + + try { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() { + from("platform-http:/streaming?useStreaming=true") + .log("Done echoing back request body as response body"); + } + }); + + context.start(); + + InputStream response = given() + .body(new FileInputStream(input.toFile())) + .post("/streaming") + .then() + .statusCode(200) + .extract() + .body() + .asInputStream(); + + try (FileOutputStream fos = new FileOutputStream(output.toFile())) { + IOHelper.copy(response, fos); + } + + assertEquals(fileContent, Files.readString(output, StandardCharsets.ISO_8859_1)); + } finally { + context.stop(); + } + } + + @Test + void testStreamingWithClosedInputStreamResponse() throws Exception { + final CamelContext context = VertxPlatformHttpEngineTest.createCamelContext(); + try { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() { + from("platform-http:/streaming?useStreaming=true") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + // Simulate an error processing an input stream by closing it ahead of the response being written + // Verifies the response promise.fail is called correctly + InputStream stream = getClass().getResourceAsStream("/authentication/auth.properties"); + if (stream != null) { + stream.close(); + } + exchange.getMessage().setBody(stream); + } + }); + } + }); + + context.start(); + + given() + .get("/streaming") + .then() + .statusCode(500); + } finally { + context.stop(); + } + } + + @Test + void testStreamingWithUnconvertableResponseType() throws Exception { + final CamelContext context = VertxPlatformHttpEngineTest.createCamelContext(); + try { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() { + from("platform-http:/streaming?useStreaming=true") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + // Force a type conversion exception and verify the response promise.fail is called correctly + exchange.getMessage().setBody(new TestBean()); + } + }); + } + }); + + context.start(); + + given() + .get("/streaming") + .then() + .statusCode(500); + } finally { + context.stop(); + } + } + + static final class TestBean { + } +} diff --git a/components/camel-platform-http/src/generated/java/org/apache/camel/component/platform/http/PlatformHttpEndpointConfigurer.java b/components/camel-platform-http/src/generated/java/org/apache/camel/component/platform/http/PlatformHttpEndpointConfigurer.java index 426e9de52d5..6223fdd295b 100644 --- a/components/camel-platform-http/src/generated/java/org/apache/camel/component/platform/http/PlatformHttpEndpointConfigurer.java +++ b/components/camel-platform-http/src/generated/java/org/apache/camel/component/platform/http/PlatformHttpEndpointConfigurer.java @@ -41,6 +41,8 @@ public class PlatformHttpEndpointConfigurer extends PropertyConfigurerSupport im case "platformhttpengine": case "platformHttpEngine": target.setPlatformHttpEngine(property(camelContext, org.apache.camel.component.platform.http.spi.PlatformHttpEngine.class, value)); return true; case "produces": target.setProduces(property(camelContext, java.lang.String.class, value)); return true; + case "usestreaming": + case "useStreaming": target.setUseStreaming(property(camelContext, boolean.class, value)); return true; default: return false; } } @@ -68,6 +70,8 @@ public class PlatformHttpEndpointConfigurer extends PropertyConfigurerSupport im case "platformhttpengine": case "platformHttpEngine": return org.apache.camel.component.platform.http.spi.PlatformHttpEngine.class; case "produces": return java.lang.String.class; + case "usestreaming": + case "useStreaming": return boolean.class; default: return null; } } @@ -96,6 +100,8 @@ public class PlatformHttpEndpointConfigurer extends PropertyConfigurerSupport im case "platformhttpengine": case "platformHttpEngine": return target.getPlatformHttpEngine(); case "produces": return target.getProduces(); + case "usestreaming": + case "useStreaming": return target.isUseStreaming(); default: return null; } } diff --git a/components/camel-platform-http/src/generated/java/org/apache/camel/component/platform/http/PlatformHttpEndpointUriFactory.java b/components/camel-platform-http/src/generated/java/org/apache/camel/component/platform/http/PlatformHttpEndpointUriFactory.java index f680a4e90a8..78a658d5a8a 100644 --- a/components/camel-platform-http/src/generated/java/org/apache/camel/component/platform/http/PlatformHttpEndpointUriFactory.java +++ b/components/camel-platform-http/src/generated/java/org/apache/camel/component/platform/http/PlatformHttpEndpointUriFactory.java @@ -21,7 +21,7 @@ public class PlatformHttpEndpointUriFactory extends org.apache.camel.support.com private static final Set<String> SECRET_PROPERTY_NAMES; private static final Set<String> MULTI_VALUE_PREFIXES; static { - Set<String> props = new HashSet<>(12); + Set<String> props = new HashSet<>(13); props.add("bridgeErrorHandler"); props.add("consumes"); props.add("exceptionHandler"); @@ -34,6 +34,7 @@ public class PlatformHttpEndpointUriFactory extends org.apache.camel.support.com props.add("path"); props.add("platformHttpEngine"); props.add("produces"); + props.add("useStreaming"); PROPERTY_NAMES = Collections.unmodifiableSet(props); SECRET_PROPERTY_NAMES = Collections.emptySet(); MULTI_VALUE_PREFIXES = Collections.emptySet(); diff --git a/components/camel-platform-http/src/generated/resources/org/apache/camel/component/platform/http/platform-http.json b/components/camel-platform-http/src/generated/resources/org/apache/camel/component/platform/http/platform-http.json index bb67bb10512..69c9b292526 100644 --- a/components/camel-platform-http/src/generated/resources/org/apache/camel/component/platform/http/platform-http.json +++ b/components/camel-platform-http/src/generated/resources/org/apache/camel/component/platform/http/platform-http.json @@ -33,11 +33,12 @@ "matchOnUriPrefix": { "index": 3, "kind": "parameter", "displayName": "Match On Uri Prefix", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether or not the consumer should try to find a target consumer by matching the URI prefix if no exact match is found." }, "muteException": { "index": 4, "kind": "parameter", "displayName": "Mute Exception", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "If enabled and an Exchange failed processing on the consumer side the response's body won't contain the exception's stack trace." }, "produces": { "index": 5, "kind": "parameter", "displayName": "Produces", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "The content type this endpoint produces, such as application\/xml or application\/json." }, - "bridgeErrorHandler": { "index": 6, "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming [...] - "exceptionHandler": { "index": 7, "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By def [...] - "exchangePattern": { "index": 8, "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." }, - "fileNameExtWhitelist": { "index": 9, "kind": "parameter", "displayName": "File Name Ext Whitelist", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "A comma or whitespace separated list of file extensions. Uploads having these extensions will be stored locally. Null value or asterisk () will allow all files." }, - "headerFilterStrategy": { "index": 10, "kind": "parameter", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom HeaderFilterStrategy to filter headers to and from Camel message." }, - "platformHttpEngine": { "index": 11, "kind": "parameter", "displayName": "Platform Http Engine", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.platform.http.spi.PlatformHttpEngine", "deprecated": false, "autowired": false, "secret": false, "description": "An HTTP Server engine implementation to serve the requests of this endpoint." } + "useStreaming": { "index": 6, "kind": "parameter", "displayName": "Use Streaming", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether to use streaming for large requests and responses" }, + "bridgeErrorHandler": { "index": 7, "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming [...] + "exceptionHandler": { "index": 8, "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By def [...] + "exchangePattern": { "index": 9, "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." }, + "fileNameExtWhitelist": { "index": 10, "kind": "parameter", "displayName": "File Name Ext Whitelist", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "A comma or whitespace separated list of file extensions. Uploads having these extensions will be stored locally. Null value or asterisk () will allow all files." }, + "headerFilterStrategy": { "index": 11, "kind": "parameter", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom HeaderFilterStrategy to filter headers to and from Camel message." }, + "platformHttpEngine": { "index": 12, "kind": "parameter", "displayName": "Platform Http Engine", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.platform.http.spi.PlatformHttpEngine", "deprecated": false, "autowired": false, "secret": false, "description": "An HTTP Server engine implementation to serve the requests of this endpoint." } } } diff --git a/components/camel-platform-http/src/main/java/org/apache/camel/component/platform/http/PlatformHttpEndpoint.java b/components/camel-platform-http/src/main/java/org/apache/camel/component/platform/http/PlatformHttpEndpoint.java index 6c3a3e81de1..2b12a69e88c 100644 --- a/components/camel-platform-http/src/main/java/org/apache/camel/component/platform/http/PlatformHttpEndpoint.java +++ b/components/camel-platform-http/src/main/java/org/apache/camel/component/platform/http/PlatformHttpEndpoint.java @@ -69,6 +69,9 @@ public class PlatformHttpEndpoint extends DefaultEndpoint implements AsyncEndpoi @UriParam(label = "advanced", description = "To use a custom HeaderFilterStrategy to filter headers to and from Camel message.") private HeaderFilterStrategy headerFilterStrategy = new PlatformHttpHeaderFilterStrategy(); + @UriParam(label = "consumer", defaultValue = "false", + description = "Whether to use streaming for large requests and responses") + private boolean useStreaming; public PlatformHttpEndpoint(String uri, String remaining, Component component) { super(uri, component); @@ -168,6 +171,14 @@ public class PlatformHttpEndpoint extends DefaultEndpoint implements AsyncEndpoi this.muteException = muteException; } + public boolean isUseStreaming() { + return useStreaming; + } + + public void setUseStreaming(boolean useStreaming) { + this.useStreaming = useStreaming; + } + PlatformHttpEngine getOrCreateEngine() { return platformHttpEngine != null ? platformHttpEngine diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PlatformHttpEndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PlatformHttpEndpointBuilderFactory.java index 8a6dd245fcc..c5db0313347 100644 --- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PlatformHttpEndpointBuilderFactory.java +++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PlatformHttpEndpointBuilderFactory.java @@ -159,6 +159,37 @@ public interface PlatformHttpEndpointBuilderFactory { doSetProperty("produces", produces); return this; } + /** + * Whether to use streaming for large requests and responses. + * + * The option is a: <code>boolean</code> type. + * + * Default: false + * Group: consumer + * + * @param useStreaming the value to set + * @return the dsl builder + */ + default PlatformHttpEndpointBuilder useStreaming(boolean useStreaming) { + doSetProperty("useStreaming", useStreaming); + return this; + } + /** + * Whether to use streaming for large requests and responses. + * + * The option will be converted to a <code>boolean</code> + * type. + * + * Default: false + * Group: consumer + * + * @param useStreaming the value to set + * @return the dsl builder + */ + default PlatformHttpEndpointBuilder useStreaming(String useStreaming) { + doSetProperty("useStreaming", useStreaming); + return this; + } } /**