This is an automated email from the ASF dual-hosted git repository. imaxon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 34351729058b19a530b9b4aa193433141840c352 Author: Michael Blow <mb...@apache.org> AuthorDate: Wed May 19 08:00:00 2021 -0400 [NO ISSUE][HYR][HTTP] http stream handler -> consumer / processor Change-Id: I7cd7622dbee880845d0b4233ce3a3b17af15eebc Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/11524 Reviewed-by: Michael Blow <mb...@apache.org> Reviewed-by: Ian Maxon <ima...@uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> --- .../org/apache/hyracks/http/server/utils/HttpUtil.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java index c61e0d2..835cd54 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java @@ -45,6 +45,7 @@ import org.apache.hyracks.http.server.BaseRequest; import org.apache.hyracks.http.server.FormUrlEncodedRequest; import org.apache.hyracks.http.server.HttpServer; import org.apache.hyracks.util.ThrowingConsumer; +import org.apache.hyracks.util.ThrowingFunction; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -191,16 +192,23 @@ public class HttpUtil { return i < 0 ? uri : uri.substring(0, i); } - public static void handleStreamInterruptibly(CloseableHttpResponse response, + public static void consumeStreamInterruptibly(CloseableHttpResponse response, ThrowingConsumer<Reader> streamProcessor, ExecutorService executor, Supplier<String> descriptionSupplier) + throws InterruptedException, ExecutionException, IOException { + processStreamInterruptibly(response, ThrowingConsumer.asFunction(streamProcessor), executor, + descriptionSupplier); + } + + public static <T> T processStreamInterruptibly(CloseableHttpResponse response, + ThrowingFunction<Reader, T> streamProcessor, ExecutorService executor, Supplier<String> descriptionSupplier) throws IOException, InterruptedException, ExecutionException { // we have to consume the stream in a separate thread, as it not stop on interrupt; we need to // instead close the connection to achieve the interrupt String description = descriptionSupplier.get(); - Future<Void> readFuture = executor.submit(() -> { + Future<T> readFuture = executor.submit(() -> { Thread.currentThread().setName(description); InputStreamReader reader = new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8); - streamProcessor.process(new Reader() { + return streamProcessor.process(new Reader() { @Override public int read(char[] cbuf, int off, int len) throws IOException { return reader.read(cbuf, off, len); @@ -213,10 +221,9 @@ public class HttpUtil { LOGGER.debug("ignoring close on {}", reader); } }); - return null; }); try { - readFuture.get(); + return readFuture.get(); } catch (InterruptedException ex) { // NOSONAR -- interrupt or rethrow response.close(); try {