abdullah alamoudi has submitted this change and it was merged. Change subject: [NO ISSUE][OTH] Enable adding request channel close listener ......................................................................
[NO ISSUE][OTH] Enable adding request channel close listener - user model changes: no - storage format changes: no - interface changes: yes - Introduce IChannelCloseHandler.handle that gets called when the request channel is closed. - Add HttpServer.getChannelCloseHandler - Add IServlet.getChannelCloseHandler details: - Previously, we didn't know that an Http client closed the connection until we try to write and find that the channel has been closed. - After this change, the moment the channel is closed, the http channel close handler is called. - A test is added with a handler that interrupts the execution. Change-Id: I42f1857c0158af6f447282cab8fbd600767b08d5 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1972 Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Michael Blow <mb...@apache.org> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> --- A hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IChannelClosedHandler.java M hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServlet.java M hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java M hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java M hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SleepyServlet.java A hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpRequestTask.java M hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java 7 files changed, 249 insertions(+), 79 deletions(-) Approvals: Anon. E. Moose #1000171: Jenkins: Verified; No violations found; ; Verified Michael Blow: Looks good to me, approved diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IChannelClosedHandler.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IChannelClosedHandler.java new file mode 100644 index 0000000..4e433ad --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IChannelClosedHandler.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.http.api; + +import java.util.concurrent.Future; + +import org.apache.hyracks.http.server.HttpServer; + +@FunctionalInterface +public interface IChannelClosedHandler { + + /** + * Handle a request channel closed event + * + * @param server + * the server handling the request + * @param servlet + * the servlet handling the request + * @param task + * the task handling the request + */ + void channelClosed(HttpServer server, IServlet servlet, Future<Void> task); +} diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServlet.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServlet.java index 157eef5..186fb0e 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServlet.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServlet.java @@ -20,6 +20,8 @@ import java.util.concurrent.ConcurrentMap; +import org.apache.hyracks.http.server.HttpServer; + /** * Represents a component that handles IServlet requests */ @@ -42,4 +44,15 @@ * @param response */ void handle(IServletRequest request, IServletResponse response); + + /** + * Get the handler for channel close events + * + * @param server + * the http server + * @return the handler for channel close events + */ + default IChannelClosedHandler getChannelClosedHandler(HttpServer server) { + return server.getChannelClosedHandler(); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java index 8ce1d70..d971a7c 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java @@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hyracks.http.api.IChannelClosedHandler; import org.apache.hyracks.http.api.IServlet; import org.apache.hyracks.util.ThreadDumpUtil; import org.apache.logging.log4j.Level; @@ -62,6 +63,7 @@ private static final int STARTED = 2; private static final int STOPPING = 3; // Final members + private final IChannelClosedHandler closedHandler; private final Object lock = new Object(); private final AtomicInteger threadId = new AtomicInteger(); private final ConcurrentMap<String, Object> ctx; @@ -78,14 +80,25 @@ private Throwable cause; public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, int port) { - this(bossGroup, workerGroup, port, DEFAULT_NUM_EXECUTOR_THREADS, DEFAULT_REQUEST_QUEUE_SIZE); + this(bossGroup, workerGroup, port, DEFAULT_NUM_EXECUTOR_THREADS, DEFAULT_REQUEST_QUEUE_SIZE, null); + } + + public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, int port, + IChannelClosedHandler closeHandler) { + this(bossGroup, workerGroup, port, DEFAULT_NUM_EXECUTOR_THREADS, DEFAULT_REQUEST_QUEUE_SIZE, closeHandler); } public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, int port, int numExecutorThreads, int requestQueueSize) { + this(bossGroup, workerGroup, port, numExecutorThreads, requestQueueSize, null); + } + + public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, int port, int numExecutorThreads, + int requestQueueSize, IChannelClosedHandler closeHandler) { this.bossGroup = bossGroup; this.workerGroup = workerGroup; this.port = port; + this.closedHandler = closeHandler; ctx = new ConcurrentHashMap<>(); servlets = new ArrayList<>(); workQueue = new LinkedBlockingQueue<>(requestQueueSize); @@ -378,6 +391,10 @@ return workQueue.size(); } + public IChannelClosedHandler getChannelClosedHandler() { + return closedHandler; + } + @Override public String toString() { return "{\"class\":\"" + getClass().getSimpleName() + "\",\"port\":" + port + ",\"state\":\"" + getState() diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java index 9290cdf..2787b30 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java @@ -19,8 +19,10 @@ package org.apache.hyracks.http.server; import java.io.IOException; +import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; +import org.apache.hyracks.http.api.IChannelClosedHandler; import org.apache.hyracks.http.api.IServlet; import org.apache.hyracks.http.api.IServletRequest; import org.apache.hyracks.http.server.utils.HttpUtil; @@ -92,12 +94,16 @@ return; } handler = new HttpRequestHandler(ctx, servlet, servletRequest, chunkSize); - submit(); + submit(ctx, servlet); } - private void submit() throws IOException { + private void submit(ChannelHandlerContext ctx, IServlet servlet) throws IOException { try { - server.getExecutor(handler).submit(handler); + Future<Void> task = server.getExecutor(handler).submit(handler); + final IChannelClosedHandler closeHandler = servlet.getChannelClosedHandler(server); + if (closeHandler != null) { + ctx.channel().closeFuture().addListener(future -> closeHandler.channelClosed(server, servlet, task)); + } } catch (RejectedExecutionException e) { // NOSONAR LOGGER.log(Level.WARN, "Request rejected by server executor service. " + e.getMessage()); handler.reject(); diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SleepyServlet.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SleepyServlet.java index 6bfa0cf..2a5a0a9 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SleepyServlet.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SleepyServlet.java @@ -31,6 +31,7 @@ public class SleepyServlet extends AbstractServlet { private volatile boolean sleep = true; + private int numSlept = 0; public SleepyServlet(ConcurrentMap<String, Object> ctx, String[] paths) { super(ctx, paths); @@ -46,8 +47,11 @@ response.setStatus(HttpResponseStatus.OK); if (sleep) { synchronized (this) { - while (sleep) { - this.wait(); + if (sleep) { + incrementSleptCount(); + while (sleep) { + this.wait(); + } } } } @@ -55,6 +59,15 @@ response.outputStream().write("I am playing hard to get".getBytes(StandardCharsets.UTF_8)); } + private void incrementSleptCount() { + numSlept++; + notifyAll(); + } + + public int getNumSlept() { + return numSlept; + } + public synchronized void wakeUp() { sleep = false; notifyAll(); diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpRequestTask.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpRequestTask.java new file mode 100644 index 0000000..17f6f9a --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpRequestTask.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.http.test; + +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Callable; + +import org.apache.commons.io.IOUtils; +import org.apache.http.HttpResponse; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.client.methods.RequestBuilder; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.impl.client.StandardHttpRequestRetryHandler; + +import io.netty.handler.codec.http.HttpResponseStatus; + +public class HttpRequestTask implements Callable<Void> { + + protected final HttpUriRequest request; + + protected HttpRequestTask() throws URISyntaxException { + request = post(null); + } + + @Override + public Void call() throws Exception { + try { + HttpResponse response = executeHttpRequest(request); + if (response.getStatusLine().getStatusCode() == HttpResponseStatus.OK.code()) { + HttpServerTest.SUCCESS_COUNT.incrementAndGet(); + } else if (response.getStatusLine().getStatusCode() == HttpResponseStatus.SERVICE_UNAVAILABLE.code()) { + HttpServerTest.UNAVAILABLE_COUNT.incrementAndGet(); + } else { + HttpServerTest.OTHER_COUNT.incrementAndGet(); + } + InputStream in = response.getEntity().getContent(); + if (HttpServerTest.PRINT_TO_CONSOLE) { + BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + String line = null; + while ((line = reader.readLine()) != null) { + System.out.println(line); + } + } + IOUtils.closeQuietly(in); + } catch (Throwable th) { + th.printStackTrace(); + throw th; + } + return null; + } + + protected HttpResponse executeHttpRequest(HttpUriRequest method) throws Exception { + HttpClient client = HttpClients.custom().setRetryHandler(StandardHttpRequestRetryHandler.INSTANCE).build(); + try { + return client.execute(method); + } catch (Exception e) { + e.printStackTrace(); + throw e; + } + } + + protected HttpUriRequest get(String query) throws URISyntaxException { + URI uri = new URI(HttpServerTest.PROTOCOL, null, HttpServerTest.HOST, HttpServerTest.PORT, HttpServerTest.PATH, + query, null); + RequestBuilder builder = RequestBuilder.get(uri); + builder.setCharset(StandardCharsets.UTF_8); + return builder.build(); + } + + protected HttpUriRequest post(String query) throws URISyntaxException { + URI uri = new URI(HttpServerTest.PROTOCOL, null, HttpServerTest.HOST, HttpServerTest.PORT, HttpServerTest.PATH, + query, null); + RequestBuilder builder = RequestBuilder.post(uri); + StringBuilder str = new StringBuilder(); + for (int i = 0; i < 32; i++) { + str.append("This is a string statement that will be ignored"); + str.append('\n'); + } + String statement = str.toString(); + builder.setHeader("Content-type", "application/x-www-form-urlencoded"); + builder.addParameter("statement", statement); + builder.setEntity(new StringEntity(statement, StandardCharsets.UTF_8)); + builder.setCharset(StandardCharsets.UTF_8); + return builder.build(); + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java index 298d2de..7e6ccf4 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java @@ -19,15 +19,12 @@ package org.apache.hyracks.http.test; import java.io.BufferedReader; -import java.io.InputStream; import java.io.InputStreamReader; import java.io.PrintWriter; import java.lang.reflect.Field; import java.net.InetAddress; import java.net.Socket; -import java.net.URI; import java.net.URISyntaxException; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; @@ -35,14 +32,6 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.io.IOUtils; -import org.apache.http.HttpResponse; -import org.apache.http.client.HttpClient; -import org.apache.http.client.methods.HttpUriRequest; -import org.apache.http.client.methods.RequestBuilder; -import org.apache.http.entity.StringEntity; -import org.apache.http.impl.client.HttpClients; -import org.apache.http.impl.client.StandardHttpRequestRetryHandler; import org.apache.hyracks.http.server.HttpServer; import org.apache.hyracks.http.server.WebManager; import org.apache.hyracks.http.server.utils.HttpUtil; @@ -69,6 +58,7 @@ static final AtomicInteger UNAVAILABLE_COUNT = new AtomicInteger(); static final AtomicInteger OTHER_COUNT = new AtomicInteger(); static final AtomicInteger EXCEPTION_COUNT = new AtomicInteger(); + static final List<HttpRequestTask> TASKS = new ArrayList<>(); static final List<Future<Void>> FUTURES = new ArrayList<>(); static final ExecutorService executor = Executors.newCachedThreadPool(); @@ -78,6 +68,8 @@ UNAVAILABLE_COUNT.set(0); OTHER_COUNT.set(0); EXCEPTION_COUNT.set(0); + FUTURES.clear(); + TASKS.clear(); } @Test @@ -303,76 +295,57 @@ } } + @Test + public void testInterruptOnClientClose() throws Exception { + WebManager webMgr = new WebManager(); + int numExecutors = 1; + int queueSize = 1; + HttpServer server = new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, numExecutors, queueSize, + (reqServer, reqServlet, reqTask) -> reqTask.cancel(true)); + SleepyServlet servlet = new SleepyServlet(server.ctx(), new String[] { PATH }); + server.addServlet(servlet); + webMgr.add(server); + webMgr.start(); + try { + request(1); + synchronized (servlet) { + while (servlet.getNumSlept() == 0) { + servlet.wait(); + } + } + request(1); + waitTillQueued(server, 1); + FUTURES.remove(0); + HttpRequestTask request = TASKS.remove(0); + request.request.abort(); + waitTillQueued(server, 0); + synchronized (servlet) { + while (servlet.getNumSlept() == 1) { + servlet.wait(); + } + } + servlet.wakeUp(); + for (Future<Void> f : FUTURES) { + f.get(); + } + FUTURES.clear(); + } finally { + webMgr.stop(); + } + } + public static void setPrivateField(Object obj, String filedName, Object value) throws Exception { Field f = obj.getClass().getDeclaredField(filedName); f.setAccessible(true); f.set(obj, value); } - private void request(int count) { + private void request(int count) throws URISyntaxException { for (int i = 0; i < count; i++) { - Future<Void> next = executor.submit(() -> { - try { - HttpUriRequest request = post(null); - HttpResponse response = executeHttpRequest(request); - if (response.getStatusLine().getStatusCode() == HttpResponseStatus.OK.code()) { - SUCCESS_COUNT.incrementAndGet(); - } else if (response.getStatusLine().getStatusCode() == HttpResponseStatus.SERVICE_UNAVAILABLE - .code()) { - UNAVAILABLE_COUNT.incrementAndGet(); - } else { - OTHER_COUNT.incrementAndGet(); - } - InputStream in = response.getEntity().getContent(); - if (PRINT_TO_CONSOLE) { - BufferedReader reader = new BufferedReader(new InputStreamReader(in)); - String line = null; - while ((line = reader.readLine()) != null) { - System.out.println(line); - } - } - IOUtils.closeQuietly(in); - } catch (Throwable th) { - // Server closed connection before we complete writing.. - EXCEPTION_COUNT.incrementAndGet(); - } - return null; - }); + HttpRequestTask requestTask = new HttpRequestTask(); + Future<Void> next = executor.submit(requestTask); FUTURES.add(next); + TASKS.add(requestTask); } - } - - public static HttpResponse executeHttpRequest(HttpUriRequest method) throws Exception { - HttpClient client = HttpClients.custom().setRetryHandler(StandardHttpRequestRetryHandler.INSTANCE).build(); - try { - return client.execute(method); - } catch (Exception e) { - e.printStackTrace(); - throw e; - } - } - - public static HttpUriRequest get(String protocol, String host, int port, String path, String query) - throws URISyntaxException { - URI uri = new URI(protocol, null, host, port, path, query, null); - RequestBuilder builder = RequestBuilder.get(uri); - builder.setCharset(StandardCharsets.UTF_8); - return builder.build(); - } - - protected HttpUriRequest post(String query) throws URISyntaxException { - URI uri = new URI(PROTOCOL, null, HOST, PORT, PATH, query, null); - RequestBuilder builder = RequestBuilder.post(uri); - StringBuilder str = new StringBuilder(); - for (int i = 0; i < 2046; i++) { - str.append("This is a string statement that will be ignored"); - str.append('\n'); - } - String statement = str.toString(); - builder.setHeader("Content-type", "application/x-www-form-urlencoded"); - builder.addParameter("statement", statement); - builder.setEntity(new StringEntity(statement, StandardCharsets.UTF_8)); - builder.setCharset(StandardCharsets.UTF_8); - return builder.build(); } } -- To view, visit https://asterix-gerrit.ics.uci.edu/1972 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I42f1857c0158af6f447282cab8fbd600767b08d5 Gerrit-PatchSet: 8 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Dmitry Lychagin <dmitry.lycha...@couchbase.com> Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Gerrit-Reviewer: Michael Blow <mb...@apache.org> Gerrit-Reviewer: Murtadha Hubail <mhub...@apache.org> Gerrit-Reviewer: Till Westmann <ti...@apache.org> Gerrit-Reviewer: abdullah alamoudi <bamou...@gmail.com>