This is an automated email from the ASF dual-hosted git repository.

imaxon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 34351729058b19a530b9b4aa193433141840c352
Author: Michael Blow <mb...@apache.org>
AuthorDate: Wed May 19 08:00:00 2021 -0400

    [NO ISSUE][HYR][HTTP] http stream handler -> consumer / processor
    
    Change-Id: I7cd7622dbee880845d0b4233ce3a3b17af15eebc
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/11524
    Reviewed-by: Michael Blow <mb...@apache.org>
    Reviewed-by: Ian Maxon <ima...@uci.edu>
    Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
---
 .../org/apache/hyracks/http/server/utils/HttpUtil.java  | 17 ++++++++++++-----
 1 file changed, 12 insertions(+), 5 deletions(-)

diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
index c61e0d2..835cd54 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
@@ -45,6 +45,7 @@ import org.apache.hyracks.http.server.BaseRequest;
 import org.apache.hyracks.http.server.FormUrlEncodedRequest;
 import org.apache.hyracks.http.server.HttpServer;
 import org.apache.hyracks.util.ThrowingConsumer;
+import org.apache.hyracks.util.ThrowingFunction;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -191,16 +192,23 @@ public class HttpUtil {
         return i < 0 ? uri : uri.substring(0, i);
     }
 
-    public static void handleStreamInterruptibly(CloseableHttpResponse 
response,
+    public static void consumeStreamInterruptibly(CloseableHttpResponse 
response,
             ThrowingConsumer<Reader> streamProcessor, ExecutorService 
executor, Supplier<String> descriptionSupplier)
+            throws InterruptedException, ExecutionException, IOException {
+        processStreamInterruptibly(response, 
ThrowingConsumer.asFunction(streamProcessor), executor,
+                descriptionSupplier);
+    }
+
+    public static <T> T processStreamInterruptibly(CloseableHttpResponse 
response,
+            ThrowingFunction<Reader, T> streamProcessor, ExecutorService 
executor, Supplier<String> descriptionSupplier)
             throws IOException, InterruptedException, ExecutionException {
         // we have to consume the stream in a separate thread, as it not stop 
on interrupt; we need to
         // instead close the connection to achieve the interrupt
         String description = descriptionSupplier.get();
-        Future<Void> readFuture = executor.submit(() -> {
+        Future<T> readFuture = executor.submit(() -> {
             Thread.currentThread().setName(description);
             InputStreamReader reader = new 
InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8);
-            streamProcessor.process(new Reader() {
+            return streamProcessor.process(new Reader() {
                 @Override
                 public int read(char[] cbuf, int off, int len) throws 
IOException {
                     return reader.read(cbuf, off, len);
@@ -213,10 +221,9 @@ public class HttpUtil {
                     LOGGER.debug("ignoring close on {}", reader);
                 }
             });
-            return null;
         });
         try {
-            readFuture.get();
+            return readFuture.get();
         } catch (InterruptedException ex) { // NOSONAR -- interrupt or rethrow
             response.close();
             try {

Reply via email to