abdullah alamoudi has submitted this change and it was merged.

Change subject: [ASTERIXDB-2039][OTH] Log Http Server direct memory budget
......................................................................


[ASTERIXDB-2039][OTH] Log Http Server direct memory budget

- user model changes: no
- storage format changes: no
- interface changes: no

details:
- Calculate mem budget = number of executors x max high watermark.
- Log the calculated budget.

Change-Id: I4a324f10db52e77c7e69ca4246b9d84b4479f25d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1941
Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhub...@apache.org>
---
M 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
M 
hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java
M 
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
M 
hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java
M 
hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
7 files changed, 141 insertions(+), 50 deletions(-)

Approvals:
  Jenkins: Verified; No violations found; ; Verified
  Murtadha Hubail: Looks good to me, approved



diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
index 9ace417..6041ab5 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
@@ -72,7 +72,7 @@
 
     @Override
     public String toString() {
-        return ActivePartitionMessage.class.getSimpleName() + event;
+        return ActivePartitionMessage.class.getSimpleName() + '-' + event;
     }
 
     @Override
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index e30272c..c6f41bf 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -362,7 +362,16 @@
             cancelRecovery = false;
             setState(ActivityState.TEMPORARILY_FAILED);
             LOGGER.log(level, "Recovery task has been submitted");
-            recoveryTask = executor.submit(() -> doRecover(policy));
+            recoveryTask = executor.submit(() -> {
+                String nameBefore = Thread.currentThread().getName();
+                try {
+                    Thread.currentThread().setName("RecoveryTask (" + entityId 
+ ")");
+                    doRecover(policy);
+                } finally {
+                    Thread.currentThread().setName(nameBefore);
+                }
+                return null;
+            });
         }
     }
 
@@ -378,11 +387,13 @@
             synchronized (this) {
                 if (cancelRecovery) {
                     recoveryTask = null;
+                    notifyAll();
                     return null;
                 }
                 while (clusterStateManager.getState() != ClusterState.ACTIVE) {
                     if (cancelRecovery) {
                         recoveryTask = null;
+                        notifyAll();
                         return null;
                     }
                     wait();
@@ -398,8 +409,15 @@
             }
             synchronized (this) {
                 try {
+                    if (cancelRecovery) {
+                        recoveryTask = null;
+                        notifyAll();
+                        return null;
+                    }
                     setState(ActivityState.RECOVERING);
                     doStart(metadataProvider);
+                    recoveryTask = null;
+                    notifyAll();
                     return null;
                 } catch (Exception e) {
                     LOGGER.log(level, "Attempt to revive " + entityId + " 
failed", e);
@@ -409,6 +427,14 @@
                     metadataProvider.getLocks().reset();
                 }
                 notifyAll();
+            }
+        }
+        // Recovery task is essntially over now either through failure or 
through cancellation(stop)
+        synchronized (this) {
+            recoveryTask = null;
+            notifyAll();
+            if (state != ActivityState.TEMPORARILY_FAILED) {
+                return null;
             }
         }
         IMetadataLockManager lockManager = 
metadataProvider.getApplicationContext().getMetadataLockManager();
@@ -422,7 +448,6 @@
             synchronized (this) {
                 if (state == ActivityState.TEMPORARILY_FAILED) {
                     setState(ActivityState.PERMANENTLY_FAILED);
-                    recoveryTask = null;
                 }
                 notifyAll();
             }
@@ -464,49 +489,40 @@
             throws HyracksDataException, AlgebricksException;
 
     @Override
-    public void stop(MetadataProvider metadataProvider) throws 
HyracksDataException, InterruptedException {
-        Future<Void> aRecoveryTask = null;
-        synchronized (this) {
-            waitForNonTransitionState();
-            if (state != ActivityState.RUNNING && state != 
ActivityState.PERMANENTLY_FAILED
-                    && state != ActivityState.TEMPORARILY_FAILED) {
-                throw new 
RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_BE_STOPPED, entityId, 
state);
-            }
-            if (state == ActivityState.TEMPORARILY_FAILED || state == 
ActivityState.PERMANENTLY_FAILED) {
-                if (recoveryTask != null) {
-                    aRecoveryTask = recoveryTask;
-                    cancelRecovery = true;
-                    recoveryTask.cancel(true);
-                }
-                setState(ActivityState.STOPPED);
-                try {
-                    setRunning(metadataProvider, false);
-                } catch (Exception e) {
-                    LOGGER.log(Level.SEVERE, "Failed to set the entity state 
as not running " + entityId, e);
-                    throw HyracksDataException.create(e);
-                }
-            } else if (state == ActivityState.RUNNING) {
-                setState(ActivityState.STOPPING);
-                try {
-                    doStop(metadataProvider);
-                    setRunning(metadataProvider, false);
-                } catch (Exception e) {
-                    setState(ActivityState.PERMANENTLY_FAILED);
-                    LOGGER.log(Level.SEVERE, "Failed to stop the entity " + 
entityId, e);
-                    throw HyracksDataException.create(e);
-                }
-            } else {
-                throw new 
RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_BE_STOPPED, entityId, 
state);
-            }
+    public synchronized void stop(MetadataProvider metadataProvider) throws 
HyracksDataException, InterruptedException {
+        waitForNonTransitionState();
+        if (state != ActivityState.RUNNING && state != 
ActivityState.PERMANENTLY_FAILED
+                && state != ActivityState.TEMPORARILY_FAILED) {
+            throw new 
RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_BE_STOPPED, entityId, 
state);
         }
-        try {
-            if (aRecoveryTask != null) {
-                aRecoveryTask.get();
+        if (state == ActivityState.TEMPORARILY_FAILED || state == 
ActivityState.PERMANENTLY_FAILED) {
+            if (recoveryTask != null) {
+                setState(ActivityState.STOPPING);
+                cancelRecovery = true;
+                recoveryTask.cancel(true);
+                while (recoveryTask != null) {
+                    wait();
+                }
             }
-        } catch (InterruptedException e) {
-            throw e;
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
+            setState(ActivityState.STOPPED);
+            try {
+                setRunning(metadataProvider, false);
+            } catch (Exception e) {
+                LOGGER.log(Level.SEVERE, "Failed to set the entity state as 
not running " + entityId, e);
+                throw HyracksDataException.create(e);
+            }
+        } else if (state == ActivityState.RUNNING) {
+            setState(ActivityState.STOPPING);
+            try {
+                doStop(metadataProvider);
+                setRunning(metadataProvider, false);
+            } catch (Exception e) {
+                setState(ActivityState.PERMANENTLY_FAILED);
+                LOGGER.log(Level.SEVERE, "Failed to stop the entity " + 
entityId, e);
+                throw HyracksDataException.create(e);
+            }
+        } else {
+            throw new 
RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_BE_STOPPED, entityId, 
state);
         }
     }
 
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
index a256bcf..d38a363 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
@@ -302,6 +302,7 @@
         listener.onStop(Behavior.SUCCEED);
         Action stopAction = users[2].stopActivity(listener);
         stopAction.sync();
+        assertSuccess(stopAction);
         Assert.assertEquals(ActivityState.STOPPED, listener.getState());
     }
 
@@ -423,8 +424,10 @@
         Assert.assertEquals(ActivityState.RUNNING, listener.getState());
         listener.onStop(Behavior.SUCCEED);
         WaitForStateSubscriber subscriber = new 
WaitForStateSubscriber(listener, EnumSet.of(ActivityState.STOPPED));
-        users[1].stopActivity(listener);
+        Action stopAction = users[1].stopActivity(listener);
         subscriber.sync();
+        stopAction.sync();
+        assertSuccess(stopAction);
         Assert.assertEquals(ActivityState.STOPPED, listener.getState());
     }
 
@@ -485,10 +488,12 @@
                 new WaitForStateSubscriber(listener, 
EnumSet.of(ActivityState.RECOVERING));
         recoveringSubscriber.sync();
         listener.onStop(Behavior.SUCCEED);
-        users[0].stopActivity(listener);
+        Action stopAction = users[0].stopActivity(listener);
         listener.allowStep();
         runningSubscriber.sync();
         stopSubscriber.sync();
+        stopAction.sync();
+        assertSuccess(stopAction);
         Assert.assertEquals(ActivityState.STOPPED, listener.getState());
     }
 
@@ -511,10 +516,12 @@
                 new WaitForStateSubscriber(listener, 
EnumSet.of(ActivityState.RECOVERING));
         recoveringSubscriber.sync();
         listener.onStop(Behavior.SUCCEED);
-        users[0].stopActivity(listener);
+        Action stopAction = users[0].stopActivity(listener);
         listener.allowStep();
         secondTempFailSubscriber.sync();
         stopSubscriber.sync();
+        stopAction.sync();
+        assertSuccess(stopAction);
         Assert.assertEquals(ActivityState.STOPPED, listener.getState());
     }
 
@@ -537,10 +544,12 @@
                 new WaitForStateSubscriber(listener, 
EnumSet.of(ActivityState.RECOVERING));
         recoveringSubscriber.sync();
         listener.onStop(Behavior.SUCCEED);
-        users[0].stopActivity(listener);
+        Action stopAction = users[0].stopActivity(listener);
         listener.allowStep();
         secondTempFailSubscriber.sync();
         stopSubscriber.sync();
+        stopAction.sync();
+        assertSuccess(stopAction);
         Assert.assertEquals(ActivityState.STOPPED, listener.getState());
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java
 
b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java
index 85ba115..bf0e1dd 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java
@@ -23,7 +23,7 @@
 
 public class ByteArrayAccessibleOutputStream extends ByteArrayOutputStream {
 
-    private static final int MAX_SIZE = 1024 * 1024 * 64;
+    private static final int MAX_SIZE = 1024 * 1024 * 32;
     private static final double BUFFER_INCREMENT_FACTOR = 1.5;
 
     public ByteArrayAccessibleOutputStream() {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index ca20f4a..e190bfa 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -34,6 +34,7 @@
 import org.apache.hyracks.http.api.IServlet;
 
 import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
@@ -83,6 +84,8 @@
         executor = new ThreadPoolExecutor(numExecutorThreads, 
numExecutorThreads, 0L, TimeUnit.MILLISECONDS,
                 new LinkedBlockingQueue<>(requestQueueSize),
                 runnable -> new Thread(runnable, "HttpExecutor(port:" + port + 
")-" + threadId.getAndIncrement()));
+        long directMemoryBudget = numExecutorThreads * (long) 
HIGH_WRITE_BUFFER_WATER_MARK;
+        LOGGER.log(Level.INFO, "The direct memory budget for this server is " 
+ directMemoryBudget + " bytes");
     }
 
     public final void start() throws Exception { // NOSONAR
@@ -194,6 +197,7 @@
         Collections.sort(servlets, (l1, l2) -> l2.getPaths()[0].length() - 
l1.getPaths()[0].length());
         ServerBootstrap b = new ServerBootstrap();
         b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
+                .childOption(ChannelOption.ALLOCATOR, 
PooledByteBufAllocator.DEFAULT)
                 .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, 
WRITE_BUFFER_WATER_MARK)
                 .handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new 
HttpServerInitializer(this));
         channel = b.bind(port).sync().channel();
@@ -255,7 +259,7 @@
     }
 
     protected HttpServerHandler createHttpHandler(int chunkSize) {
-        return new HttpServerHandler<>(this, chunkSize);
+        return new HttpServerHandler(this, chunkSize);
     }
 
     public ExecutorService getExecutor() {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java
index 30df003..863eddd 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java
@@ -18,7 +18,13 @@
  */
 package org.apache.hyracks.http.servlet;
 
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryPoolMXBean;
+import java.lang.management.MemoryType;
+import java.lang.reflect.Field;
+import java.util.List;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -27,10 +33,13 @@
 import org.apache.hyracks.http.server.AbstractServlet;
 import org.apache.hyracks.http.server.utils.HttpUtil;
 
+import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.util.internal.PlatformDependent;
 
 public class ChattyServlet extends AbstractServlet {
     private static final Logger LOGGER = 
Logger.getLogger(ChattyServlet.class.getName());
+    private static long MAX = 0L;
     private byte[] bytes;
 
     public ChattyServlet(ConcurrentMap<String, Object> ctx, String[] paths) {
@@ -52,5 +61,57 @@
         for (int i = 0; i < 100; i++) {
             response.outputStream().write(bytes);
         }
+        printMemUsage();
+    }
+
+    @SuppressWarnings("restriction")
+    public synchronized static void printMemUsage() {
+        StringBuilder report = new StringBuilder();
+        report.append("sun.misc.VM.maxDirectMemory: ");
+        report.append(sun.misc.VM.maxDirectMemory());
+        report.append('\n');
+        
report.append("sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getMemoryUsed():
 ");
+        
report.append(sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getMemoryUsed());
+        report.append('\n');
+        
report.append("sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getTotalCapacity():
 ");
+        
report.append(sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getTotalCapacity());
+        report.append('\n');
+        
report.append("ManagementFactory.getMemoryMXBean().getNonHeapMemoryUsage(): ");
+        
report.append(ManagementFactory.getMemoryMXBean().getNonHeapMemoryUsage());
+        report.append('\n');
+        report.append("---------------------------- Beans 
----------------------------");
+        report.append('\n');
+        List<MemoryPoolMXBean> memPoolBeans = 
ManagementFactory.getMemoryPoolMXBeans();
+        for (MemoryPoolMXBean bean : memPoolBeans) {
+            if (bean.isValid() && bean.getType() == MemoryType.NON_HEAP) {
+                report.append(bean.getName());
+                report.append(": ");
+                report.append(bean.getUsage());
+                report.append('\n');
+            }
+        }
+        report.append("---------------------------- Netty 
----------------------------");
+        report.append('\n');
+        try {
+            Field field = 
PlatformDependent.class.getDeclaredField("DIRECT_MEMORY_COUNTER");
+            field.setAccessible(true);
+            AtomicLong usedDirectMemory = (AtomicLong) field.get(null);
+            long used = usedDirectMemory.get();
+            report.append("Current PlatformDependent.DIRECT_MEMORY_COUNTER: ");
+            report.append(used);
+            report.append('\n');
+            report.append("Maximum PlatformDependent.DIRECT_MEMORY_COUNTER: ");
+            MAX = Math.max(MAX, used);
+            report.append(MAX);
+            report.append('\n');
+            report.append('\n');
+        } catch (Throwable th) {
+            th.printStackTrace();
+            LOGGER.log(Level.WARNING, "Failed to access 
PlatformDependent.DIRECT_MEMORY_COUNTER", th);
+            return;
+        }
+        report.append("--------------- PooledByteBufAllocator.DEFAULT 
----------------");
+        report.append(PooledByteBufAllocator.DEFAULT.dumpStats());
+        LOGGER.log(Level.INFO, report.toString());
     }
 }
\ No newline at end of file
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
index 854980e..6512dc1 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
@@ -94,6 +94,7 @@
 
     @Test
     public void testChattyServer() throws Exception {
+        ChattyServlet.printMemUsage();
         int numRequests = 64;
         int numExecutors = 32;
         int serverQueueSize = 32;

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1941
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I4a324f10db52e77c7e69ca4246b9d84b4479f25d
Gerrit-PatchSet: 9
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>
Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Murtadha Hubail <mhub...@apache.org>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: abdullah alamoudi <bamou...@gmail.com>

Reply via email to