abdullah alamoudi has submitted this change and it was merged. Change subject: [ASTERIXDB-2039][OTH] Log Http Server direct memory budget ......................................................................
[ASTERIXDB-2039][OTH] Log Http Server direct memory budget - user model changes: no - storage format changes: no - interface changes: no details: - Calculate mem budget = number of executors x max high watermark. - Log the calculated budget. Change-Id: I4a324f10db52e77c7e69ca4246b9d84b4479f25d Reviewed-on: https://asterix-gerrit.ics.uci.edu/1941 Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Murtadha Hubail <mhub...@apache.org> --- M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java M hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java M hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java M hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java M hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java 7 files changed, 141 insertions(+), 50 deletions(-) Approvals: Jenkins: Verified; No violations found; ; Verified Murtadha Hubail: Looks good to me, approved diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java index 9ace417..6041ab5 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java @@ -72,7 +72,7 @@ @Override public String toString() { - return ActivePartitionMessage.class.getSimpleName() + event; + return ActivePartitionMessage.class.getSimpleName() + '-' + event; } @Override diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java index e30272c..c6f41bf 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java @@ -362,7 +362,16 @@ cancelRecovery = false; setState(ActivityState.TEMPORARILY_FAILED); LOGGER.log(level, "Recovery task has been submitted"); - recoveryTask = executor.submit(() -> doRecover(policy)); + recoveryTask = executor.submit(() -> { + String nameBefore = Thread.currentThread().getName(); + try { + Thread.currentThread().setName("RecoveryTask (" + entityId + ")"); + doRecover(policy); + } finally { + Thread.currentThread().setName(nameBefore); + } + return null; + }); } } @@ -378,11 +387,13 @@ synchronized (this) { if (cancelRecovery) { recoveryTask = null; + notifyAll(); return null; } while (clusterStateManager.getState() != ClusterState.ACTIVE) { if (cancelRecovery) { recoveryTask = null; + notifyAll(); return null; } wait(); @@ -398,8 +409,15 @@ } synchronized (this) { try { + if (cancelRecovery) { + recoveryTask = null; + notifyAll(); + return null; + } setState(ActivityState.RECOVERING); doStart(metadataProvider); + recoveryTask = null; + notifyAll(); return null; } catch (Exception e) { LOGGER.log(level, "Attempt to revive " + entityId + " failed", e); @@ -409,6 +427,14 @@ metadataProvider.getLocks().reset(); } notifyAll(); + } + } + // Recovery task is essntially over now either through failure or through cancellation(stop) + synchronized (this) { + recoveryTask = null; + notifyAll(); + if (state != ActivityState.TEMPORARILY_FAILED) { + return null; } } IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager(); @@ -422,7 +448,6 @@ synchronized (this) { if (state == ActivityState.TEMPORARILY_FAILED) { setState(ActivityState.PERMANENTLY_FAILED); - recoveryTask = null; } notifyAll(); } @@ -464,49 +489,40 @@ throws HyracksDataException, AlgebricksException; @Override - public void stop(MetadataProvider metadataProvider) throws HyracksDataException, InterruptedException { - Future<Void> aRecoveryTask = null; - synchronized (this) { - waitForNonTransitionState(); - if (state != ActivityState.RUNNING && state != ActivityState.PERMANENTLY_FAILED - && state != ActivityState.TEMPORARILY_FAILED) { - throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_BE_STOPPED, entityId, state); - } - if (state == ActivityState.TEMPORARILY_FAILED || state == ActivityState.PERMANENTLY_FAILED) { - if (recoveryTask != null) { - aRecoveryTask = recoveryTask; - cancelRecovery = true; - recoveryTask.cancel(true); - } - setState(ActivityState.STOPPED); - try { - setRunning(metadataProvider, false); - } catch (Exception e) { - LOGGER.log(Level.SEVERE, "Failed to set the entity state as not running " + entityId, e); - throw HyracksDataException.create(e); - } - } else if (state == ActivityState.RUNNING) { - setState(ActivityState.STOPPING); - try { - doStop(metadataProvider); - setRunning(metadataProvider, false); - } catch (Exception e) { - setState(ActivityState.PERMANENTLY_FAILED); - LOGGER.log(Level.SEVERE, "Failed to stop the entity " + entityId, e); - throw HyracksDataException.create(e); - } - } else { - throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_BE_STOPPED, entityId, state); - } + public synchronized void stop(MetadataProvider metadataProvider) throws HyracksDataException, InterruptedException { + waitForNonTransitionState(); + if (state != ActivityState.RUNNING && state != ActivityState.PERMANENTLY_FAILED + && state != ActivityState.TEMPORARILY_FAILED) { + throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_BE_STOPPED, entityId, state); } - try { - if (aRecoveryTask != null) { - aRecoveryTask.get(); + if (state == ActivityState.TEMPORARILY_FAILED || state == ActivityState.PERMANENTLY_FAILED) { + if (recoveryTask != null) { + setState(ActivityState.STOPPING); + cancelRecovery = true; + recoveryTask.cancel(true); + while (recoveryTask != null) { + wait(); + } } - } catch (InterruptedException e) { - throw e; - } catch (Exception e) { - throw HyracksDataException.create(e); + setState(ActivityState.STOPPED); + try { + setRunning(metadataProvider, false); + } catch (Exception e) { + LOGGER.log(Level.SEVERE, "Failed to set the entity state as not running " + entityId, e); + throw HyracksDataException.create(e); + } + } else if (state == ActivityState.RUNNING) { + setState(ActivityState.STOPPING); + try { + doStop(metadataProvider); + setRunning(metadataProvider, false); + } catch (Exception e) { + setState(ActivityState.PERMANENTLY_FAILED); + LOGGER.log(Level.SEVERE, "Failed to stop the entity " + entityId, e); + throw HyracksDataException.create(e); + } + } else { + throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_BE_STOPPED, entityId, state); } } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java index a256bcf..d38a363 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java @@ -302,6 +302,7 @@ listener.onStop(Behavior.SUCCEED); Action stopAction = users[2].stopActivity(listener); stopAction.sync(); + assertSuccess(stopAction); Assert.assertEquals(ActivityState.STOPPED, listener.getState()); } @@ -423,8 +424,10 @@ Assert.assertEquals(ActivityState.RUNNING, listener.getState()); listener.onStop(Behavior.SUCCEED); WaitForStateSubscriber subscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.STOPPED)); - users[1].stopActivity(listener); + Action stopAction = users[1].stopActivity(listener); subscriber.sync(); + stopAction.sync(); + assertSuccess(stopAction); Assert.assertEquals(ActivityState.STOPPED, listener.getState()); } @@ -485,10 +488,12 @@ new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)); recoveringSubscriber.sync(); listener.onStop(Behavior.SUCCEED); - users[0].stopActivity(listener); + Action stopAction = users[0].stopActivity(listener); listener.allowStep(); runningSubscriber.sync(); stopSubscriber.sync(); + stopAction.sync(); + assertSuccess(stopAction); Assert.assertEquals(ActivityState.STOPPED, listener.getState()); } @@ -511,10 +516,12 @@ new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)); recoveringSubscriber.sync(); listener.onStop(Behavior.SUCCEED); - users[0].stopActivity(listener); + Action stopAction = users[0].stopActivity(listener); listener.allowStep(); secondTempFailSubscriber.sync(); stopSubscriber.sync(); + stopAction.sync(); + assertSuccess(stopAction); Assert.assertEquals(ActivityState.STOPPED, listener.getState()); } @@ -537,10 +544,12 @@ new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)); recoveringSubscriber.sync(); listener.onStop(Behavior.SUCCEED); - users[0].stopActivity(listener); + Action stopAction = users[0].stopActivity(listener); listener.allowStep(); secondTempFailSubscriber.sync(); stopSubscriber.sync(); + stopAction.sync(); + assertSuccess(stopAction); Assert.assertEquals(ActivityState.STOPPED, listener.getState()); } diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java index 85ba115..bf0e1dd 100644 --- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java +++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java @@ -23,7 +23,7 @@ public class ByteArrayAccessibleOutputStream extends ByteArrayOutputStream { - private static final int MAX_SIZE = 1024 * 1024 * 64; + private static final int MAX_SIZE = 1024 * 1024 * 32; private static final double BUFFER_INCREMENT_FACTOR = 1.5; public ByteArrayAccessibleOutputStream() { diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java index ca20f4a..e190bfa 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java @@ -34,6 +34,7 @@ import org.apache.hyracks.http.api.IServlet; import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; @@ -83,6 +84,8 @@ executor = new ThreadPoolExecutor(numExecutorThreads, numExecutorThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(requestQueueSize), runnable -> new Thread(runnable, "HttpExecutor(port:" + port + ")-" + threadId.getAndIncrement())); + long directMemoryBudget = numExecutorThreads * (long) HIGH_WRITE_BUFFER_WATER_MARK; + LOGGER.log(Level.INFO, "The direct memory budget for this server is " + directMemoryBudget + " bytes"); } public final void start() throws Exception { // NOSONAR @@ -194,6 +197,7 @@ Collections.sort(servlets, (l1, l2) -> l2.getPaths()[0].length() - l1.getPaths()[0].length()); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, WRITE_BUFFER_WATER_MARK) .handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new HttpServerInitializer(this)); channel = b.bind(port).sync().channel(); @@ -255,7 +259,7 @@ } protected HttpServerHandler createHttpHandler(int chunkSize) { - return new HttpServerHandler<>(this, chunkSize); + return new HttpServerHandler(this, chunkSize); } public ExecutorService getExecutor() { diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java index 30df003..863eddd 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java @@ -18,7 +18,13 @@ */ package org.apache.hyracks.http.servlet; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryPoolMXBean; +import java.lang.management.MemoryType; +import java.lang.reflect.Field; +import java.util.List; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; @@ -27,10 +33,13 @@ import org.apache.hyracks.http.server.AbstractServlet; import org.apache.hyracks.http.server.utils.HttpUtil; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.util.internal.PlatformDependent; public class ChattyServlet extends AbstractServlet { private static final Logger LOGGER = Logger.getLogger(ChattyServlet.class.getName()); + private static long MAX = 0L; private byte[] bytes; public ChattyServlet(ConcurrentMap<String, Object> ctx, String[] paths) { @@ -52,5 +61,57 @@ for (int i = 0; i < 100; i++) { response.outputStream().write(bytes); } + printMemUsage(); + } + + @SuppressWarnings("restriction") + public synchronized static void printMemUsage() { + StringBuilder report = new StringBuilder(); + report.append("sun.misc.VM.maxDirectMemory: "); + report.append(sun.misc.VM.maxDirectMemory()); + report.append('\n'); + report.append("sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getMemoryUsed(): "); + report.append(sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getMemoryUsed()); + report.append('\n'); + report.append("sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getTotalCapacity(): "); + report.append(sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getTotalCapacity()); + report.append('\n'); + report.append("ManagementFactory.getMemoryMXBean().getNonHeapMemoryUsage(): "); + report.append(ManagementFactory.getMemoryMXBean().getNonHeapMemoryUsage()); + report.append('\n'); + report.append("---------------------------- Beans ----------------------------"); + report.append('\n'); + List<MemoryPoolMXBean> memPoolBeans = ManagementFactory.getMemoryPoolMXBeans(); + for (MemoryPoolMXBean bean : memPoolBeans) { + if (bean.isValid() && bean.getType() == MemoryType.NON_HEAP) { + report.append(bean.getName()); + report.append(": "); + report.append(bean.getUsage()); + report.append('\n'); + } + } + report.append("---------------------------- Netty ----------------------------"); + report.append('\n'); + try { + Field field = PlatformDependent.class.getDeclaredField("DIRECT_MEMORY_COUNTER"); + field.setAccessible(true); + AtomicLong usedDirectMemory = (AtomicLong) field.get(null); + long used = usedDirectMemory.get(); + report.append("Current PlatformDependent.DIRECT_MEMORY_COUNTER: "); + report.append(used); + report.append('\n'); + report.append("Maximum PlatformDependent.DIRECT_MEMORY_COUNTER: "); + MAX = Math.max(MAX, used); + report.append(MAX); + report.append('\n'); + report.append('\n'); + } catch (Throwable th) { + th.printStackTrace(); + LOGGER.log(Level.WARNING, "Failed to access PlatformDependent.DIRECT_MEMORY_COUNTER", th); + return; + } + report.append("--------------- PooledByteBufAllocator.DEFAULT ----------------"); + report.append(PooledByteBufAllocator.DEFAULT.dumpStats()); + LOGGER.log(Level.INFO, report.toString()); } } \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java index 854980e..6512dc1 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java @@ -94,6 +94,7 @@ @Test public void testChattyServer() throws Exception { + ChattyServlet.printMemUsage(); int numRequests = 64; int numExecutors = 32; int serverQueueSize = 32; -- To view, visit https://asterix-gerrit.ics.uci.edu/1941 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I4a324f10db52e77c7e69ca4246b9d84b4479f25d Gerrit-PatchSet: 9 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com> Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Gerrit-Reviewer: Murtadha Hubail <mhub...@apache.org> Gerrit-Reviewer: Till Westmann <ti...@apache.org> Gerrit-Reviewer: abdullah alamoudi <bamou...@gmail.com>