abdullah alamoudi has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/1991
Change subject: [NO ISSUE][OTH] Improve Http Server ...................................................................... [NO ISSUE][OTH] Improve Http Server - user model changes: no - storage format changes: no - interface changes: no details: - Log estimated input memory budget - Ensure all allocated input memory buffers are 4K. This reduces the chance of memory allocation to go beyond budget. This also allows input and output to share buffers of this size since it is the size of choice for reading and writing. - Reject requests that go beyond server capacity before reading them which reduces wasted resources. Change-Id: I7adcd59047805dc384e1c119191eff995c6e9a7a --- M hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java M hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java M hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java M hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/WebManager.java M hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java M hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java 6 files changed, 165 insertions(+), 89 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/91/1991/1 diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java index 0066b77..415a656 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java @@ -88,7 +88,9 @@ try { flush(); } finally { - buffer.release(); + if (buffer != null) { + buffer.release(); + } } } else { response.fullReponse(buffer); @@ -101,18 +103,21 @@ @Override public void flush() throws IOException { ensureWritable(); - if (buffer.readableBytes() > 0) { - if (response.status() == HttpResponseStatus.OK) { - int size = buffer.capacity(); - response.beforeFlush(); - DefaultHttpContent content = new DefaultHttpContent(buffer); - ctx.write(content, ctx.channel().voidPromise()); - buffer = ctx.alloc().buffer(size); - } else { - ByteBuf aBuffer = ctx.alloc().buffer(buffer.readableBytes()); - aBuffer.writeBytes(buffer); - response.error(aBuffer); - buffer.clear(); + if (buffer != null) { + if (buffer.readableBytes() > 0) { + if (response.status() == HttpResponseStatus.OK) { + int size = buffer.capacity(); + response.beforeFlush(); + DefaultHttpContent content = new DefaultHttpContent(buffer); + buffer = null; + ctx.writeAndFlush(content, ctx.channel().voidPromise()); + buffer = ctx.alloc().buffer(size); + } else { + ByteBuf aBuffer = ctx.alloc().buffer(buffer.readableBytes()); + aBuffer.writeBytes(buffer); + response.error(aBuffer); + buffer.clear(); + } } } } @@ -120,7 +125,6 @@ private synchronized void ensureWritable() throws IOException { while (!ctx.channel().isWritable()) { try { - ctx.flush(); if (!ctx.channel().isOpen()) { throw new IOException("Closed channel"); } diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java index 56f454f..138b612 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java @@ -18,16 +18,20 @@ */ package org.apache.hyracks.http.server; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryPoolMXBean; +import java.lang.management.MemoryType; +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; @@ -38,11 +42,13 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; +import io.netty.channel.FixedRecvByteBufAllocator; import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; +import io.netty.util.internal.PlatformDependent; public class HttpServer { // Constants @@ -50,12 +56,17 @@ private static final int HIGH_WRITE_BUFFER_WATER_MARK = 32 * 1024; protected static final WriteBufferWaterMark WRITE_BUFFER_WATER_MARK = new WriteBufferWaterMark(LOW_WRITE_BUFFER_WATER_MARK, HIGH_WRITE_BUFFER_WATER_MARK); + protected static final int RECEIVE_BUFFER_SIZE = 4096; + protected static final int DEFAULT_NUM_EXECUTOR_THREADS = 16; + protected static final int DEFAULT_REQUEST_QUEUE_SIZE = 256; private static final Logger LOGGER = Logger.getLogger(HttpServer.class.getName()); private static final int FAILED = -1; private static final int STOPPED = 0; private static final int STARTING = 1; private static final int STARTED = 2; private static final int STOPPING = 3; + // Static + private static long maxMemUsage = 0L; // Final members private final Object lock = new Object(); private final AtomicInteger threadId = new AtomicInteger(); @@ -65,14 +76,14 @@ private final EventLoopGroup bossGroup; private final EventLoopGroup workerGroup; private final int port; - private final ExecutorService executor; + private final ThreadPoolExecutor executor; // Mutable members private volatile int state = STOPPED; private Channel channel; private Throwable cause; public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, int port) { - this(bossGroup, workerGroup, port, 16, 256); + this(bossGroup, workerGroup, port, DEFAULT_NUM_EXECUTOR_THREADS, DEFAULT_REQUEST_QUEUE_SIZE); } public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, int port, int numExecutorThreads, @@ -87,7 +98,15 @@ runnable -> new Thread(runnable, "HttpExecutor(port:" + port + ")-" + threadId.getAndIncrement())); long directMemoryBudget = numExecutorThreads * (long) HIGH_WRITE_BUFFER_WATER_MARK + numExecutorThreads * HttpServerInitializer.RESPONSE_CHUNK_SIZE; - LOGGER.log(Level.INFO, "The direct memory budget for this server is " + directMemoryBudget + " bytes"); + LOGGER.log(Level.INFO, "The output direct memory budget for this server is " + directMemoryBudget + " bytes"); + long inputBudgetEstimate = ((long) HttpServerInitializer.MAX_REQUEST_INITIAL_LINE_LENGTH + * (requestQueueSize + numExecutorThreads)); + inputBudgetEstimate = inputBudgetEstimate << 1; + LOGGER.log(Level.INFO, + "The \"estimated\" input direct memory budget for this server is " + inputBudgetEstimate + " bytes"); + LOGGER.log(Level.INFO, "Multiple arenas, memory fragments, and local thread cached buffers " + + "can cause the input memory usage to exceed estimate"); + LOGGER.log(Level.INFO, "Custom Buffer allocator must be used to avoid this"); } public final void start() throws Exception { // NOSONAR @@ -199,6 +218,7 @@ Collections.sort(servlets, (l1, l2) -> l2.getPaths()[0].length() - l1.getPaths()[0].length()); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) + .childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(RECEIVE_BUFFER_SIZE)) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, WRITE_BUFFER_WATER_MARK) .handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new HttpServerInitializer(this)); @@ -264,7 +284,7 @@ return new HttpServerHandler<>(this, chunkSize); } - public ExecutorService getExecutor() { + public ThreadPoolExecutor getExecutor() { return executor; } @@ -275,4 +295,55 @@ public int getWorkQueueSize() { return workQueue.size(); } + + @SuppressWarnings("restriction") + public synchronized static void printMemUsage() { + StringBuilder report = new StringBuilder(); + report.append("sun.misc.VM.maxDirectMemory: "); + report.append(sun.misc.VM.maxDirectMemory()); + report.append('\n'); + report.append("sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getMemoryUsed(): "); + report.append(sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getMemoryUsed()); + report.append('\n'); + report.append("sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getTotalCapacity(): "); + report.append(sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getTotalCapacity()); + report.append('\n'); + report.append("ManagementFactory.getMemoryMXBean().getNonHeapMemoryUsage(): "); + report.append(ManagementFactory.getMemoryMXBean().getNonHeapMemoryUsage()); + report.append('\n'); + report.append("---------------------------- Beans ----------------------------"); + report.append('\n'); + List<MemoryPoolMXBean> memPoolBeans = ManagementFactory.getMemoryPoolMXBeans(); + for (MemoryPoolMXBean bean : memPoolBeans) { + if (bean.isValid() && bean.getType() == MemoryType.NON_HEAP) { + report.append(bean.getName()); + report.append(": "); + report.append(bean.getUsage()); + report.append('\n'); + } + } + report.append("---------------------------- Netty ----------------------------"); + report.append('\n'); + try { + Field field = PlatformDependent.class.getDeclaredField("DIRECT_MEMORY_COUNTER"); + field.setAccessible(true); + AtomicLong usedDirectMemory = (AtomicLong) field.get(null); + long used = usedDirectMemory.get(); + report.append("Current PlatformDependent.DIRECT_MEMORY_COUNTER: "); + report.append(used); + report.append('\n'); + report.append("Maximum PlatformDependent.DIRECT_MEMORY_COUNTER: "); + maxMemUsage = Math.max(maxMemUsage, used); + report.append(maxMemUsage); + report.append('\n'); + report.append('\n'); + } catch (Throwable th) { + th.printStackTrace(); + LOGGER.log(Level.WARNING, "Failed to access PlatformDependent.DIRECT_MEMORY_COUNTER", th); + return; + } + report.append("--------------- PooledByteBufAllocator.DEFAULT ----------------"); + report.append(PooledByteBufAllocator.DEFAULT.dumpStats()); + LOGGER.log(Level.INFO, report.toString()); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java index a32da39..6e523cf 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java @@ -18,12 +18,20 @@ */ package org.apache.hyracks.http.server; +import java.util.concurrent.atomic.AtomicInteger; + +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponseEncoder; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; public class HttpServerInitializer extends ChannelInitializer<SocketChannel> { @@ -31,10 +39,34 @@ public static final int MAX_REQUEST_HEADER_SIZE = 262144; public static final int MAX_REQUEST_INITIAL_LINE_LENGTH = 131072; public static final int RESPONSE_CHUNK_SIZE = 4096; - private HttpServer server; + public static final DefaultFullHttpResponse response = + new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.SERVICE_UNAVAILABLE); + private final HttpServer server; + private final AtomicInteger remainingCapacity; public HttpServerInitializer(HttpServer server) { this.server = server; + remainingCapacity = new AtomicInteger( + server.getExecutor().getQueue().remainingCapacity() + server.getExecutor().getMaximumPoolSize()); + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + // This is the place to do input control + if (remainingCapacity.decrementAndGet() < 0) { + remainingCapacity.incrementAndGet(); + HttpResponseEncoder encoder = new HttpResponseEncoder(); + ChannelPromise promise = ctx.voidPromise(); + response.retain(); + encoder.write(ctx, response, promise); + encoder.flush(ctx); + promise.addListener(ChannelFutureListener.CLOSE); + return; + } else { + // increment on channel close + ctx.channel().closeFuture().addListener(future -> remainingCapacity.incrementAndGet()); + super.handlerAdded(ctx); + } } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/WebManager.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/WebManager.java index 4a09f78..4d624df 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/WebManager.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/WebManager.java @@ -30,10 +30,36 @@ private final EventLoopGroup bosses; private final EventLoopGroup workers; + /** + * Create a web manager with number of bosses = 1 and number of workers = MultithreadEventLoopGroup.DEFAULT_EVENT_LOOP_THREADS + * The default can be set using -Dio.netty.eventLoopThreads, otherwise, it is set to Runtime.getRuntime().availableProcessors() * 2 + */ public WebManager() { + this(1, 0); + } + + /** + * Create a web manager with number of bosses = 1 and number of workers = numWorkers + * + * @param numWorkers + * number of worker threads + */ + public WebManager(int numWorkers) { + this(1, numWorkers); + } + + /** + * Create a web manager with number of bosses = numBosses and number of workers = numWorkers + * + * @param numBosses + * number of boss threads + * @param numWorkers + * number of worker threads + */ + public WebManager(int numBosses, int numWorkers) { servers = new ArrayList<>(); - bosses = new NioEventLoopGroup(1); - workers = new NioEventLoopGroup(); + bosses = new NioEventLoopGroup(numBosses); + workers = new NioEventLoopGroup(numWorkers); } public List<HttpServer> getServers() { diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java index bf0452b..ffbf0bc 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java @@ -18,28 +18,20 @@ */ package org.apache.hyracks.http.servlet; -import java.lang.management.ManagementFactory; -import java.lang.management.MemoryPoolMXBean; -import java.lang.management.MemoryType; -import java.lang.reflect.Field; -import java.util.List; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; import org.apache.hyracks.http.api.IServletRequest; import org.apache.hyracks.http.api.IServletResponse; import org.apache.hyracks.http.server.AbstractServlet; +import org.apache.hyracks.http.server.HttpServer; import org.apache.hyracks.http.server.utils.HttpUtil; -import io.netty.buffer.PooledByteBufAllocator; import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.util.internal.PlatformDependent; public class ChattyServlet extends AbstractServlet { private static final Logger LOGGER = Logger.getLogger(ChattyServlet.class.getName()); - private static long MAX = 0L; private byte[] bytes; public ChattyServlet(ConcurrentMap<String, Object> ctx, String[] paths) { @@ -66,57 +58,6 @@ for (int i = 0; i < 100; i++) { response.outputStream().write(bytes); } - printMemUsage(); - } - - @SuppressWarnings("restriction") - public synchronized static void printMemUsage() { - StringBuilder report = new StringBuilder(); - report.append("sun.misc.VM.maxDirectMemory: "); - report.append(sun.misc.VM.maxDirectMemory()); - report.append('\n'); - report.append("sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getMemoryUsed(): "); - report.append(sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getMemoryUsed()); - report.append('\n'); - report.append("sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getTotalCapacity(): "); - report.append(sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getTotalCapacity()); - report.append('\n'); - report.append("ManagementFactory.getMemoryMXBean().getNonHeapMemoryUsage(): "); - report.append(ManagementFactory.getMemoryMXBean().getNonHeapMemoryUsage()); - report.append('\n'); - report.append("---------------------------- Beans ----------------------------"); - report.append('\n'); - List<MemoryPoolMXBean> memPoolBeans = ManagementFactory.getMemoryPoolMXBeans(); - for (MemoryPoolMXBean bean : memPoolBeans) { - if (bean.isValid() && bean.getType() == MemoryType.NON_HEAP) { - report.append(bean.getName()); - report.append(": "); - report.append(bean.getUsage()); - report.append('\n'); - } - } - report.append("---------------------------- Netty ----------------------------"); - report.append('\n'); - try { - Field field = PlatformDependent.class.getDeclaredField("DIRECT_MEMORY_COUNTER"); - field.setAccessible(true); - AtomicLong usedDirectMemory = (AtomicLong) field.get(null); - long used = usedDirectMemory.get(); - report.append("Current PlatformDependent.DIRECT_MEMORY_COUNTER: "); - report.append(used); - report.append('\n'); - report.append("Maximum PlatformDependent.DIRECT_MEMORY_COUNTER: "); - MAX = Math.max(MAX, used); - report.append(MAX); - report.append('\n'); - report.append('\n'); - } catch (Throwable th) { - th.printStackTrace(); - LOGGER.log(Level.WARNING, "Failed to access PlatformDependent.DIRECT_MEMORY_COUNTER", th); - return; - } - report.append("--------------- PooledByteBufAllocator.DEFAULT ----------------"); - report.append(PooledByteBufAllocator.DEFAULT.dumpStats()); - LOGGER.log(Level.INFO, report.toString()); + HttpServer.printMemUsage(); } } \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java index 66d1b77..bc50b42 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java @@ -77,7 +77,7 @@ WebManager webMgr = new WebManager(); int numExecutors = 16; int serverQueueSize = 16; - int numRequests = 48; + int numRequests = 128; HttpServer server = new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, numExecutors, serverQueueSize); SleepyServlet servlet = new SleepyServlet(server.ctx(), new String[] { PATH }); @@ -111,7 +111,7 @@ } private void waitTillQueued(HttpServer server, int expectedQueued) throws Exception { - int maxAttempts = 5; + int maxAttempts = 15; int attempt = 0; int queued = server.getWorkQueueSize(); while (queued != expectedQueued) { @@ -144,7 +144,7 @@ try { try { for (int i = 0; i < numPatches; i++) { - ChattyServlet.printMemUsage(); + HttpServer.printMemUsage(); request(numRequests); for (Future<Void> f : FUTURES) { f.get(); @@ -152,7 +152,7 @@ FUTURES.clear(); } } finally { - ChattyServlet.printMemUsage(); + HttpServer.printMemUsage(); servlet.wakeUp(); for (Future<Void> f : stuck) { f.get(); @@ -174,7 +174,7 @@ int numRequests = 64; int numExecutors = 32; int serverQueueSize = 32; - ChattyServlet.printMemUsage(); + HttpServer.printMemUsage(); WebManager webMgr = new WebManager(); HttpServer server = new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, numExecutors, serverQueueSize); @@ -191,7 +191,9 @@ Assert.assertEquals(0, UNAVAILABLE_COUNT.get()); Assert.assertEquals(0, OTHER_COUNT.get()); } finally { + HttpServer.printMemUsage(); webMgr.stop(); + HttpServer.printMemUsage(); } } @@ -291,7 +293,7 @@ URI uri = new URI(PROTOCOL, null, HOST, PORT, PATH, query, null); RequestBuilder builder = RequestBuilder.post(uri); StringBuilder str = new StringBuilder(); - for (int i = 0; i < 32; i++) { + for (int i = 0; i < 2046; i++) { str.append("This is a string statement that will be ignored"); str.append('\n'); } -- To view, visit https://asterix-gerrit.ics.uci.edu/1991 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I7adcd59047805dc384e1c119191eff995c6e9a7a Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>