BryanMLima commented on code in PR #9840:
URL: https://github.com/apache/cloudstack/pull/9840#discussion_r1894169638


##########
agent/src/main/java/com/cloud/agent/Agent.java:
##########
@@ -125,128 +128,134 @@ public int value() {
         }
     }
 
-    List<IAgentControlListener> _controlListeners = new 
ArrayList<IAgentControlListener>();
+    CopyOnWriteArrayList<IAgentControlListener> controlListeners = new 
CopyOnWriteArrayList<>();
 
-    IAgentShell _shell;
-    NioConnection _connection;
-    ServerResource _resource;
-    Link _link;
-    Long _id;
+    IAgentShell shell;
+    NioConnection connection;
+    ServerResource serverResource;
+    Link link;
+    Long id;
 
-    Timer _timer = new Timer("Agent Timer");
-    Timer certTimer;
-    Timer hostLBTimer;
+    ScheduledExecutorService selfTaskExecutor;
+    ScheduledExecutorService certExecutor;
+    ScheduledExecutorService hostLbCheckExecutor;
 
-    List<WatchTask> _watchList = new ArrayList<WatchTask>();
-    long _sequence = 0;
-    long _lastPingResponseTime = 0;
-    long _pingInterval = 0;
-    AtomicInteger _inProgress = new AtomicInteger();
+    CopyOnWriteArrayList<ScheduledFuture<?>> watchList = new 
CopyOnWriteArrayList<>();
+    AtomicLong sequence = new AtomicLong(0);
+    AtomicLong lastPingResponseTime = new AtomicLong(0L);
+    long pingInterval = 0;
+    AtomicInteger commandsInProgress = new AtomicInteger(0);
 
-    StartupTask _startup = null;
-    long _startupWaitDefault = 180000;
-    long _startupWait = _startupWaitDefault;
-    boolean _reconnectAllowed = true;
-    //For time sentitive task, e.g. PingTask
-    ThreadPoolExecutor _ugentTaskPool;
-    ExecutorService _executor;
+    private final AtomicReference<StartupTask> startupTask = new 
AtomicReference<>();
+    private static final long DEFAULT_STARTUP_WAIT = 180;
+    long startupWait = DEFAULT_STARTUP_WAIT;
+    boolean reconnectAllowed = true;
 
-    Thread _shutdownThread = new ShutdownThread(this);
+    //For time sensitive task, e.g. PingTask
+    ThreadPoolExecutor outRequestHandler;
+    ExecutorService requestHandler;
 
-    private String _keystoreSetupPath;
-    private String _keystoreCertImportPath;
+    Thread shutdownThread = new ShutdownThread(this);
 
-    // for simulator use only
-    public Agent(final IAgentShell shell) {
-        _shell = shell;
-        _link = null;
+    private String keystoreSetupSetupPath;
+    private String keystoreCertImportScriptPath;
 
-        _connection = new NioClient("Agent", _shell.getNextHost(), 
_shell.getPort(), _shell.getWorkers(), this);
+    private String hostname;
 
-        Runtime.getRuntime().addShutdownHook(_shutdownThread);
+    protected String getLinkLog(final Link link) {
+        if (link == null) {
+            return "";
+        }
+        StringBuilder str = new StringBuilder();
+        if (logger.isTraceEnabled()) {
+            str.append(System.identityHashCode(link)).append("-");
+        }
+        str.append(link.getSocketAddress());
+        return str.toString();
+    }
 
-        _ugentTaskPool =
-                new ThreadPoolExecutor(shell.getPingRetries(), 2 * 
shell.getPingRetries(), 10, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(), 
new NamedThreadFactory(
-                        "UgentTask"));
+    protected String getAgentName() {
+        return (serverResource != null && 
serverResource.isAppendAgentNameToLogs() &&
+                StringUtils.isNotBlank(serverResource.getName())) ?
+                serverResource.getName() :
+                "Agent";
+    }
 
-        _executor =
-                new ThreadPoolExecutor(_shell.getWorkers(), 5 * 
_shell.getWorkers(), 1, TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>(), new 
NamedThreadFactory(
-                        "agentRequest-Handler"));
+    protected void setupShutdownHookAndInitExecutors() {
+        if (logger.isTraceEnabled()) {
+            logger.trace("Adding shutdown hook");
+        }

Review Comment:
   There is no need for a guard here.
   ```suggestion
           logger.trace("Adding shutdown hook");
   ```



##########
agent/src/main/java/com/cloud/agent/Agent.java:
##########
@@ -125,128 +128,134 @@ public int value() {
         }
     }
 
-    List<IAgentControlListener> _controlListeners = new 
ArrayList<IAgentControlListener>();
+    CopyOnWriteArrayList<IAgentControlListener> controlListeners = new 
CopyOnWriteArrayList<>();
 
-    IAgentShell _shell;
-    NioConnection _connection;
-    ServerResource _resource;
-    Link _link;
-    Long _id;
+    IAgentShell shell;
+    NioConnection connection;
+    ServerResource serverResource;
+    Link link;
+    Long id;
 
-    Timer _timer = new Timer("Agent Timer");
-    Timer certTimer;
-    Timer hostLBTimer;
+    ScheduledExecutorService selfTaskExecutor;
+    ScheduledExecutorService certExecutor;
+    ScheduledExecutorService hostLbCheckExecutor;
 
-    List<WatchTask> _watchList = new ArrayList<WatchTask>();
-    long _sequence = 0;
-    long _lastPingResponseTime = 0;
-    long _pingInterval = 0;
-    AtomicInteger _inProgress = new AtomicInteger();
+    CopyOnWriteArrayList<ScheduledFuture<?>> watchList = new 
CopyOnWriteArrayList<>();
+    AtomicLong sequence = new AtomicLong(0);
+    AtomicLong lastPingResponseTime = new AtomicLong(0L);
+    long pingInterval = 0;
+    AtomicInteger commandsInProgress = new AtomicInteger(0);
 
-    StartupTask _startup = null;
-    long _startupWaitDefault = 180000;
-    long _startupWait = _startupWaitDefault;
-    boolean _reconnectAllowed = true;
-    //For time sentitive task, e.g. PingTask
-    ThreadPoolExecutor _ugentTaskPool;
-    ExecutorService _executor;
+    private final AtomicReference<StartupTask> startupTask = new 
AtomicReference<>();
+    private static final long DEFAULT_STARTUP_WAIT = 180;
+    long startupWait = DEFAULT_STARTUP_WAIT;
+    boolean reconnectAllowed = true;
 
-    Thread _shutdownThread = new ShutdownThread(this);
+    //For time sensitive task, e.g. PingTask
+    ThreadPoolExecutor outRequestHandler;
+    ExecutorService requestHandler;
 
-    private String _keystoreSetupPath;
-    private String _keystoreCertImportPath;
+    Thread shutdownThread = new ShutdownThread(this);
 
-    // for simulator use only
-    public Agent(final IAgentShell shell) {
-        _shell = shell;
-        _link = null;
+    private String keystoreSetupSetupPath;
+    private String keystoreCertImportScriptPath;
 
-        _connection = new NioClient("Agent", _shell.getNextHost(), 
_shell.getPort(), _shell.getWorkers(), this);
+    private String hostname;
 
-        Runtime.getRuntime().addShutdownHook(_shutdownThread);
+    protected String getLinkLog(final Link link) {
+        if (link == null) {
+            return "";
+        }
+        StringBuilder str = new StringBuilder();
+        if (logger.isTraceEnabled()) {
+            str.append(System.identityHashCode(link)).append("-");
+        }
+        str.append(link.getSocketAddress());
+        return str.toString();
+    }
 
-        _ugentTaskPool =
-                new ThreadPoolExecutor(shell.getPingRetries(), 2 * 
shell.getPingRetries(), 10, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(), 
new NamedThreadFactory(
-                        "UgentTask"));
+    protected String getAgentName() {
+        return (serverResource != null && 
serverResource.isAppendAgentNameToLogs() &&
+                StringUtils.isNotBlank(serverResource.getName())) ?
+                serverResource.getName() :
+                "Agent";
+    }
 
-        _executor =
-                new ThreadPoolExecutor(_shell.getWorkers(), 5 * 
_shell.getWorkers(), 1, TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>(), new 
NamedThreadFactory(
-                        "agentRequest-Handler"));
+    protected void setupShutdownHookAndInitExecutors() {
+        if (logger.isTraceEnabled()) {
+            logger.trace("Adding shutdown hook");
+        }
+        Runtime.getRuntime().addShutdownHook(shutdownThread);
+        selfTaskExecutor = Executors.newScheduledThreadPool(1, new 
NamedThreadFactory("Agent-SelfTask"));
+        outRequestHandler = new ThreadPoolExecutor(shell.getPingRetries(), 2 * 
shell.getPingRetries(), 10, TimeUnit.MINUTES,
+                new SynchronousQueue<>(), new 
NamedThreadFactory("AgentOutRequest-Handler"));
+        requestHandler = new ThreadPoolExecutor(shell.getWorkers(), 5 * 
shell.getWorkers(), 1, TimeUnit.DAYS,
+                new LinkedBlockingQueue<>(), new 
NamedThreadFactory("AgentRequest-Handler"));
     }
 
-    public Agent(final IAgentShell shell, final int localAgentId, final 
ServerResource resource) throws ConfigurationException {
-        _shell = shell;
-        _resource = resource;
-        _link = null;
+    // for simulator use only

Review Comment:
   Could change this to Javadoc



##########
agent/src/main/java/com/cloud/agent/Agent.java:
##########
@@ -332,108 +335,127 @@ public void stop(final String reason, final String 
detail) {
             } catch (final InterruptedException e) {
                 logger.debug("Who the heck interrupted me here?");
             }
-            _connection.stop();
-            _connection = null;
-            _link = null;
+            connection.stop();
+            connection = null;
+            link = null;
         }
 
-        if (_resource != null) {
-            _resource.stop();
-            _resource = null;
+        if (serverResource != null) {
+            serverResource.stop();
+            serverResource = null;
         }
 
-        if (_startup != null) {
-            _startup = null;
+        if (startupTask.get() != null) {
+            startupTask.set(null);
         }
 
-        if (_ugentTaskPool != null) {
-            _ugentTaskPool.shutdownNow();
-            _ugentTaskPool = null;
+        if (outRequestHandler != null) {
+            outRequestHandler.shutdownNow();
+            outRequestHandler = null;
         }
 
-        if (_executor != null) {
-            _executor.shutdown();
-            _executor = null;
+        if (requestHandler != null) {
+            requestHandler.shutdown();
+            requestHandler = null;
         }
 
-        if (_timer != null) {
-            _timer.cancel();
-            _timer = null;
+        if (selfTaskExecutor != null) {
+            selfTaskExecutor.shutdown();
+            selfTaskExecutor = null;
         }
 
-        if (hostLBTimer != null) {
-            hostLBTimer.cancel();
-            hostLBTimer = null;
+        if (hostLbCheckExecutor != null) {
+            hostLbCheckExecutor.shutdown();
+            hostLbCheckExecutor = null;
         }
 
-        if (certTimer != null) {
-            certTimer.cancel();
-            certTimer = null;
+        if (certExecutor != null) {
+            certExecutor.shutdown();
+            certExecutor = null;
         }
     }
 
     public Long getId() {
-        return _id;
+        return id;
     }
 
     public void setId(final Long id) {
         logger.debug("Set agent id {}", id);
-        _id = id;
-        _shell.setPersistentProperty(getResourceName(), "id", 
Long.toString(id));
+        this.id = id;
+        shell.setPersistentProperty(getResourceName(), "id", 
Long.toString(id));
     }
 
-    private synchronized void scheduleServicesRestartTask() {
-        if (certTimer != null) {
-            certTimer.cancel();
-            certTimer.purge();
+    private void scheduleCertificateRenewalTask() {
+        String name = "CertificateRenewalTask";
+        if (certExecutor != null && !certExecutor.isShutdown()) {
+            certExecutor.shutdown();
+            try {
+                if (!certExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
+                    certExecutor.shutdownNow();
+                }
+            } catch (InterruptedException e) {
+                logger.debug("Forcing {} shutdown as it did not shutdown in 
the desired time due to: {}",
+                                name, e.getMessage());
+                certExecutor.shutdownNow();
+            }
         }
-        certTimer = new Timer("Certificate Renewal Timer");
-        certTimer.schedule(new PostCertificateRenewalTask(this), 5000L);
+        certExecutor = Executors.newSingleThreadScheduledExecutor((new 
NamedThreadFactory(name)));
+        certExecutor.schedule(new PostCertificateRenewalTask(this), 5, 
TimeUnit.SECONDS);
     }
 
-    private synchronized void scheduleHostLBCheckerTask(final long 
checkInterval) {
-        if (hostLBTimer != null) {
-            hostLBTimer.cancel();
+    private void scheduleHostLBCheckerTask(final long checkInterval) {
+        String name = "HostLBCheckerTask";
+        if (hostLbCheckExecutor != null && !hostLbCheckExecutor.isShutdown()) {
+            hostLbCheckExecutor.shutdown();
+            try {
+                if (!hostLbCheckExecutor.awaitTermination(1, 
TimeUnit.SECONDS)) {
+                    hostLbCheckExecutor.shutdownNow();
+                }
+            } catch (InterruptedException e) {
+                logger.debug("Forcing {} shutdown as it did not shutdown in 
the desired time due to: {}",
+                        name, e.getMessage());
+                hostLbCheckExecutor.shutdownNow();
+            }
         }
         if (checkInterval > 0L) {
-            logger.info("Scheduling preferred host timer task with 
host.lb.interval={}ms", checkInterval);
-            hostLBTimer = new Timer("Host LB Timer");
-            hostLBTimer.scheduleAtFixedRate(new PreferredHostCheckerTask(), 
checkInterval, checkInterval);
+            logger.info("Scheduling preferred host task with 
host.lb.interval={}ms", checkInterval);
+            hostLbCheckExecutor = 
Executors.newSingleThreadScheduledExecutor((new NamedThreadFactory(name)));
+            hostLbCheckExecutor.scheduleAtFixedRate(new 
PreferredHostCheckerTask(), checkInterval, checkInterval,
+                    TimeUnit.MILLISECONDS);
         }
     }
 
     public void scheduleWatch(final Link link, final Request request, final 
long delay, final long period) {
-        synchronized (_watchList) {
-            logger.debug("Adding task with request: {} to watch list", 
request.toString());
 
-            final WatchTask task = new WatchTask(link, request, this);
-            _timer.schedule(task, 0, period);
-            _watchList.add(task);
+        if (logger.isDebugEnabled()) {

Review Comment:
   Same here, no need for a guard.



##########
agent/src/main/java/com/cloud/agent/Agent.java:
##########
@@ -444,59 +466,87 @@ protected void cancelTasks() {
      * when host is added back
      */
     protected void cleanupAgentZoneProperties() {
-        _shell.setPersistentProperty(null, "zone", "");
-        _shell.setPersistentProperty(null, "cluster", "");
-        _shell.setPersistentProperty(null, "pod", "");
+        shell.setPersistentProperty(null, "zone", "");
+        shell.setPersistentProperty(null, "cluster", "");
+        shell.setPersistentProperty(null, "pod", "");
+    }
+
+    public void lockStartupTask(final Link link) {
+        logger.debug("Creating startup task for link: {}", getLinkLog(link));
+        StartupTask currentTask = startupTask.get();
+        if (currentTask != null) {
+            logger.warn("A Startup task is already locked or in progress, 
cannot create for link {}",
+                    getLinkLog(link));
+            return;
+        }
+        currentTask = new StartupTask(link);
+        if (startupTask.compareAndSet(null, currentTask)) {
+            selfTaskExecutor.schedule(currentTask, startupWait, 
TimeUnit.SECONDS);
+            return;
+        }
+        logger.warn("Failed to lock a StartupTask for link: {}", 
getLinkLog(link));
     }
 
-    public synchronized void lockStartupTask(final Link link) {
-        _startup = new StartupTask(link);
-        _timer.schedule(_startup, _startupWait);
+    protected boolean cancelStartupTask() {
+        StartupTask task = startupTask.getAndSet(null);
+        if (task != null) {
+            task.cancel();
+            return true;
+        }
+        return false;
     }
 
     public void sendStartup(final Link link) {
-        final StartupCommand[] startup = _resource.initialize();
+        final StartupCommand[] startup = serverResource.initialize();
         if (startup != null) {
-            final String msHostList = _shell.getPersistentProperty(null, 
"host");
+            final String msHostList = shell.getPersistentProperty(null, 
"host");
             final Command[] commands = new Command[startup.length];
             for (int i = 0; i < startup.length; i++) {
                 setupStartupCommand(startup[i]);
                 startup[i].setMSHostList(msHostList);
                 commands[i] = startup[i];
             }
-            final Request request = new Request(_id != null ? _id : -1, -1, 
commands, false, false);
+            final Request request = new Request(id != null ? id : -1, -1, 
commands, false, false);
             request.setSequence(getNextSequence());
 
             logger.debug("Sending Startup: {}", request.toString());
             lockStartupTask(link);
             try {
                 link.send(request.toBytes());
             } catch (final ClosedChannelException e) {
-                logger.warn("Unable to send request: {}", request.toString());
+                logger.warn("Unable to send request to {} due to '{}', 
request: {}",
+                        getLinkLog(link), e.getMessage(), request);
             }
 
-            if (_resource instanceof ResourceStatusUpdater) {
-                ((ResourceStatusUpdater) 
_resource).registerStatusUpdater(this);
+            if (serverResource instanceof ResourceStatusUpdater) {
+                ((ResourceStatusUpdater) 
serverResource).registerStatusUpdater(this);
             }
         }
     }
 
-    protected void setupStartupCommand(final StartupCommand startup) {
-        InetAddress addr;
+    protected String retrieveHostname() {
+        if (logger.isTraceEnabled()) {
+            logger.trace("Retrieving hostname with resource={}", 
serverResource.getClass().getSimpleName());
+        }

Review Comment:
   ```suggestion
           logger.trace("Retrieving hostname with resource={}", () -> 
serverResource.getClass().getSimpleName());
   ```



##########
agent/src/main/java/com/cloud/agent/Agent.java:
##########
@@ -332,108 +335,127 @@ public void stop(final String reason, final String 
detail) {
             } catch (final InterruptedException e) {
                 logger.debug("Who the heck interrupted me here?");
             }
-            _connection.stop();
-            _connection = null;
-            _link = null;
+            connection.stop();
+            connection = null;
+            link = null;
         }
 
-        if (_resource != null) {
-            _resource.stop();
-            _resource = null;
+        if (serverResource != null) {
+            serverResource.stop();
+            serverResource = null;
         }
 
-        if (_startup != null) {
-            _startup = null;
+        if (startupTask.get() != null) {
+            startupTask.set(null);
         }
 
-        if (_ugentTaskPool != null) {
-            _ugentTaskPool.shutdownNow();
-            _ugentTaskPool = null;
+        if (outRequestHandler != null) {
+            outRequestHandler.shutdownNow();
+            outRequestHandler = null;
         }
 
-        if (_executor != null) {
-            _executor.shutdown();
-            _executor = null;
+        if (requestHandler != null) {
+            requestHandler.shutdown();
+            requestHandler = null;
         }
 
-        if (_timer != null) {
-            _timer.cancel();
-            _timer = null;
+        if (selfTaskExecutor != null) {
+            selfTaskExecutor.shutdown();
+            selfTaskExecutor = null;
         }
 
-        if (hostLBTimer != null) {
-            hostLBTimer.cancel();
-            hostLBTimer = null;
+        if (hostLbCheckExecutor != null) {
+            hostLbCheckExecutor.shutdown();
+            hostLbCheckExecutor = null;
         }
 
-        if (certTimer != null) {
-            certTimer.cancel();
-            certTimer = null;
+        if (certExecutor != null) {
+            certExecutor.shutdown();
+            certExecutor = null;
         }
     }
 
     public Long getId() {
-        return _id;
+        return id;
     }
 
     public void setId(final Long id) {
         logger.debug("Set agent id {}", id);
-        _id = id;
-        _shell.setPersistentProperty(getResourceName(), "id", 
Long.toString(id));
+        this.id = id;
+        shell.setPersistentProperty(getResourceName(), "id", 
Long.toString(id));
     }
 
-    private synchronized void scheduleServicesRestartTask() {
-        if (certTimer != null) {
-            certTimer.cancel();
-            certTimer.purge();
+    private void scheduleCertificateRenewalTask() {
+        String name = "CertificateRenewalTask";
+        if (certExecutor != null && !certExecutor.isShutdown()) {
+            certExecutor.shutdown();
+            try {
+                if (!certExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
+                    certExecutor.shutdownNow();
+                }
+            } catch (InterruptedException e) {
+                logger.debug("Forcing {} shutdown as it did not shutdown in 
the desired time due to: {}",
+                                name, e.getMessage());
+                certExecutor.shutdownNow();
+            }
         }
-        certTimer = new Timer("Certificate Renewal Timer");
-        certTimer.schedule(new PostCertificateRenewalTask(this), 5000L);
+        certExecutor = Executors.newSingleThreadScheduledExecutor((new 
NamedThreadFactory(name)));
+        certExecutor.schedule(new PostCertificateRenewalTask(this), 5, 
TimeUnit.SECONDS);
     }
 
-    private synchronized void scheduleHostLBCheckerTask(final long 
checkInterval) {
-        if (hostLBTimer != null) {
-            hostLBTimer.cancel();
+    private void scheduleHostLBCheckerTask(final long checkInterval) {
+        String name = "HostLBCheckerTask";
+        if (hostLbCheckExecutor != null && !hostLbCheckExecutor.isShutdown()) {
+            hostLbCheckExecutor.shutdown();
+            try {
+                if (!hostLbCheckExecutor.awaitTermination(1, 
TimeUnit.SECONDS)) {
+                    hostLbCheckExecutor.shutdownNow();
+                }
+            } catch (InterruptedException e) {
+                logger.debug("Forcing {} shutdown as it did not shutdown in 
the desired time due to: {}",
+                        name, e.getMessage());
+                hostLbCheckExecutor.shutdownNow();
+            }
         }
         if (checkInterval > 0L) {
-            logger.info("Scheduling preferred host timer task with 
host.lb.interval={}ms", checkInterval);
-            hostLBTimer = new Timer("Host LB Timer");
-            hostLBTimer.scheduleAtFixedRate(new PreferredHostCheckerTask(), 
checkInterval, checkInterval);
+            logger.info("Scheduling preferred host task with 
host.lb.interval={}ms", checkInterval);
+            hostLbCheckExecutor = 
Executors.newSingleThreadScheduledExecutor((new NamedThreadFactory(name)));
+            hostLbCheckExecutor.scheduleAtFixedRate(new 
PreferredHostCheckerTask(), checkInterval, checkInterval,
+                    TimeUnit.MILLISECONDS);
         }
     }
 
     public void scheduleWatch(final Link link, final Request request, final 
long delay, final long period) {
-        synchronized (_watchList) {
-            logger.debug("Adding task with request: {} to watch list", 
request.toString());
 
-            final WatchTask task = new WatchTask(link, request, this);
-            _timer.schedule(task, 0, period);
-            _watchList.add(task);
+        if (logger.isDebugEnabled()) {
+            logger.debug("Adding a watch list");
         }
+        final WatchTask task = new WatchTask(link, request, this);
+        final ScheduledFuture<?> future = 
selfTaskExecutor.scheduleAtFixedRate(task, delay, period, 
TimeUnit.MILLISECONDS);
+        watchList.add(future);
     }
 
     public void triggerUpdate() {
-        PingCommand command = _resource.getCurrentStatus(getId());
+        PingCommand command = serverResource.getCurrentStatus(getId());
         command.setOutOfBand(true);
         logger.debug("Sending out of band ping");
-
-        final Request request = new Request(_id, -1, command, false);
+        final Request request = new Request(id, -1, command, false);
         request.setSequence(getNextSequence());
         try {
-            _link.send(request.toBytes());
+            link.send(request.toBytes());
         } catch (final ClosedChannelException e) {
             logger.warn("Unable to send ping update: {}", request.toString());
         }
     }
 
     protected void cancelTasks() {
-        synchronized (_watchList) {
-            for (final WatchTask task : _watchList) {
-                task.cancel();
-            }
-            logger.debug("Clearing {} tasks of watch list", _watchList.size());
-            _watchList.clear();
+        for (final ScheduledFuture<?> task : watchList) {
+            task.cancel(true);
+        }
+        if (logger.isDebugEnabled()) {
+            logger.debug("Clearing watch list: " + watchList.size());
         }

Review Comment:
   If you really want a guard here, you can use lazy logging.
   ```suggestion
           logger.debug("Clearing watch list: {}", () -> watchList.size());
   ```



##########
agent/src/main/java/com/cloud/agent/Agent.java:
##########
@@ -444,59 +466,87 @@ protected void cancelTasks() {
      * when host is added back
      */
     protected void cleanupAgentZoneProperties() {
-        _shell.setPersistentProperty(null, "zone", "");
-        _shell.setPersistentProperty(null, "cluster", "");
-        _shell.setPersistentProperty(null, "pod", "");
+        shell.setPersistentProperty(null, "zone", "");
+        shell.setPersistentProperty(null, "cluster", "");
+        shell.setPersistentProperty(null, "pod", "");
+    }
+
+    public void lockStartupTask(final Link link) {
+        logger.debug("Creating startup task for link: {}", getLinkLog(link));

Review Comment:
   Here it makes sense to use lazy logging.
   ```suggestion
           logger.debug("Creating startup task for link: {}", () -> 
getLinkLog(link));
   ```



##########
engine/orchestration/src/main/java/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java:
##########
@@ -67,119 +68,121 @@ public void processHostVmStatePingReport(long hostId, 
Map<String, HostVmStateRep
         processReport(hostId, translatedInfo, force);
     }
 
-    private void processReport(long hostId, Map<Long, 
VirtualMachine.PowerState> translatedInfo, boolean force) {
-
-        logger.debug("Process VM state report. host: {}, number of records in 
report: {}.", hostId, translatedInfo.size());
-
-        for (Map.Entry<Long, VirtualMachine.PowerState> entry : 
translatedInfo.entrySet()) {
-
-            logger.debug("VM state report. host: {}, vm id: {}, power state: 
{}.", hostId, entry.getKey(), entry.getValue());
-
-            if (_instanceDao.updatePowerState(entry.getKey(), hostId, 
entry.getValue(), DateUtil.currentGMTTime())) {
-                logger.debug("VM state report is updated. host: {}, vm id: {}, 
power state: {}.", hostId, entry.getKey(), entry.getValue());
-
-                _messageBus.publish(null, 
VirtualMachineManager.Topics.VM_POWER_STATE, PublishScope.GLOBAL, 
entry.getKey());
-            } else {
-                logger.trace("VM power state does not change, skip DB writing. 
vm id: {}.", entry.getKey());
+    private void updateAndPublishVmPowerStates(long hostId, Map<Long, 
VirtualMachine.PowerState> instancePowerStates,
+           Date updateTime) {
+        if (instancePowerStates.isEmpty()) {
+            return;
+        }
+        Set<Long> vmIds = instancePowerStates.keySet();
+        Map<Long, VirtualMachine.PowerState> notUpdated = 
_instanceDao.updatePowerState(instancePowerStates, hostId,
+                updateTime);
+        if (notUpdated.size() <= vmIds.size()) {
+            for (Long vmId : vmIds) {
+                if (!notUpdated.isEmpty() && !notUpdated.containsKey(vmId)) {
+                    logger.debug("VM state report is updated. host: {}, vm id: 
{}}, power state: {}}",
+                            hostId, vmId, instancePowerStates.get(vmId));
+                    _messageBus.publish(null, 
VirtualMachineManager.Topics.VM_POWER_STATE,
+                            PublishScope.GLOBAL, vmId);
+                    continue;
+                }
+                logger.trace("VM power state does not change, skip DB writing. 
vm id: {}", vmId);
             }
         }
+    }
 
+    private void processMissingVmReport(long hostId, Set<Long> vmIds, boolean 
force) {
         // any state outdates should be checked against the time before this 
list was retrieved
         Date startTime = DateUtil.currentGMTTime();
         // for all running/stopping VMs, we provide monitoring of missing 
report
-        List<VMInstanceVO> vmsThatAreMissingReport = 
_instanceDao.findByHostInStates(hostId, VirtualMachine.State.Running,
-                VirtualMachine.State.Stopping, VirtualMachine.State.Starting);
-        java.util.Iterator<VMInstanceVO> it = 
vmsThatAreMissingReport.iterator();
-        while (it.hasNext()) {
-            VMInstanceVO instance = it.next();
-            if (translatedInfo.get(instance.getId()) != null)
-                it.remove();
-        }
-
+        List<VMInstanceVO> vmsThatAreMissingReport = 
_instanceDao.findByHostInStatesExcluding(hostId, vmIds,
+                VirtualMachine.State.Running, VirtualMachine.State.Stopping, 
VirtualMachine.State.Starting);
         // here we need to be wary of out of band migration as opposed to 
other, more unexpected state changes
-        if (vmsThatAreMissingReport.size() > 0) {
-            Date currentTime = DateUtil.currentGMTTime();
-            logger.debug("Run missing VM report. current time: {}", 
currentTime.getTime());
-
-            // 2 times of sync-update interval for graceful period
-            long milliSecondsGracefullPeriod = 
mgmtServiceConf.getPingInterval() * 2000L;
-
-            for (VMInstanceVO instance : vmsThatAreMissingReport) {
-
-                // Make sure powerState is up to date for missing VMs
-                try {
-                    if (!force && 
!_instanceDao.isPowerStateUpToDate(instance.getId())) {
-                        logger.warn("Detected missing VM but power state is 
outdated, wait for another process report run for VM id: {}.", 
instance.getId());
-                        
_instanceDao.resetVmPowerStateTracking(instance.getId());
-                        continue;
-                    }
-                } catch (CloudRuntimeException e) {
-                    logger.warn("Checked for missing powerstate of a none 
existing vm", e);
-                    continue;
-                }
+        if (vmsThatAreMissingReport.isEmpty()) {
+            return;
+        }
+        Date currentTime = DateUtil.currentGMTTime();
+        if (logger.isDebugEnabled()) {
+            logger.debug("Run missing VM report. current time: " + 
currentTime.getTime());
+        }
+        if (!force) {
+            List<Long> outdatedVms = vmsThatAreMissingReport.stream()
+                    .filter(v -> !_instanceDao.isPowerStateUpToDate(v))
+                    .map(VMInstanceVO::getId)
+                    .collect(Collectors.toList());
+            _instanceDao.resetVmPowerStateTracking(outdatedVms);
+            vmsThatAreMissingReport = vmsThatAreMissingReport.stream()
+                    .filter(v -> !outdatedVms.contains(v.getId()))
+                    .collect(Collectors.toList());
+        }
 
-                Date vmStateUpdateTime = instance.getPowerStateUpdateTime();
+        // 2 times of sync-update interval for graceful period
+        long milliSecondsGracefulPeriod = mgmtServiceConf.getPingInterval() * 
2000L;
+        Map<Long, VirtualMachine.PowerState> instancePowerStates = new 
HashMap<>();
+        for (VMInstanceVO instance : vmsThatAreMissingReport) {
+            Date vmStateUpdateTime = instance.getPowerStateUpdateTime();
+            if (vmStateUpdateTime == null) {
+                logger.warn("VM power state update time is null, falling back 
to update time for vm id: " + instance.getId());

Review Comment:
   ```suggestion
                   logger.warn("VM power state update time is null, falling 
back to update time for vm id: {}", instance.getId());
   ```



##########
agent/src/main/java/com/cloud/agent/Agent.java:
##########
@@ -1136,22 +1175,16 @@ public void doTask(final Task task) throws 
TaskExecutionException {
                     } else {
                         //put the requests from mgt server into another thread 
pool, as the request may take a longer time to finish. Don't block the NIO main 
thread pool
                         //processRequest(request, task.getLink());
-                        _executor.submit(new AgentRequestHandler(getType(), 
getLink(), request));
+                        requestHandler.submit(new 
AgentRequestHandler(getType(), getLink(), request));
                     }
                 } catch (final ClassNotFoundException e) {
                     logger.error("Unable to find this request ");
                 } catch (final Exception e) {
                     logger.error("Error parsing task", e);
                 }
             } else if (task.getType() == Task.Type.DISCONNECT) {
-                try {
-                    // an issue has been found if reconnect immediately after 
disconnecting. please refer to https://github.com/apache/cloudstack/issues/8517
-                    // wait 5 seconds before reconnecting
-                    Thread.sleep(5000);
-                } catch (InterruptedException e) {
-                }
+                logger.debug("Executing disconnect task - {}", 
getLinkLog(task.getLink()));

Review Comment:
   ```suggestion
                   logger.debug("Executing disconnect task - {}", () -> 
getLinkLog(task.getLink()));
   ```



##########
engine/orchestration/src/main/java/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java:
##########
@@ -67,119 +68,121 @@ public void processHostVmStatePingReport(long hostId, 
Map<String, HostVmStateRep
         processReport(hostId, translatedInfo, force);
     }
 
-    private void processReport(long hostId, Map<Long, 
VirtualMachine.PowerState> translatedInfo, boolean force) {
-
-        logger.debug("Process VM state report. host: {}, number of records in 
report: {}.", hostId, translatedInfo.size());
-
-        for (Map.Entry<Long, VirtualMachine.PowerState> entry : 
translatedInfo.entrySet()) {
-
-            logger.debug("VM state report. host: {}, vm id: {}, power state: 
{}.", hostId, entry.getKey(), entry.getValue());
-
-            if (_instanceDao.updatePowerState(entry.getKey(), hostId, 
entry.getValue(), DateUtil.currentGMTTime())) {
-                logger.debug("VM state report is updated. host: {}, vm id: {}, 
power state: {}.", hostId, entry.getKey(), entry.getValue());
-
-                _messageBus.publish(null, 
VirtualMachineManager.Topics.VM_POWER_STATE, PublishScope.GLOBAL, 
entry.getKey());
-            } else {
-                logger.trace("VM power state does not change, skip DB writing. 
vm id: {}.", entry.getKey());
+    private void updateAndPublishVmPowerStates(long hostId, Map<Long, 
VirtualMachine.PowerState> instancePowerStates,
+           Date updateTime) {
+        if (instancePowerStates.isEmpty()) {
+            return;
+        }
+        Set<Long> vmIds = instancePowerStates.keySet();
+        Map<Long, VirtualMachine.PowerState> notUpdated = 
_instanceDao.updatePowerState(instancePowerStates, hostId,
+                updateTime);
+        if (notUpdated.size() <= vmIds.size()) {
+            for (Long vmId : vmIds) {
+                if (!notUpdated.isEmpty() && !notUpdated.containsKey(vmId)) {
+                    logger.debug("VM state report is updated. host: {}, vm id: 
{}}, power state: {}}",
+                            hostId, vmId, instancePowerStates.get(vmId));
+                    _messageBus.publish(null, 
VirtualMachineManager.Topics.VM_POWER_STATE,
+                            PublishScope.GLOBAL, vmId);
+                    continue;
+                }
+                logger.trace("VM power state does not change, skip DB writing. 
vm id: {}", vmId);
             }
         }
+    }
 
+    private void processMissingVmReport(long hostId, Set<Long> vmIds, boolean 
force) {
         // any state outdates should be checked against the time before this 
list was retrieved
         Date startTime = DateUtil.currentGMTTime();
         // for all running/stopping VMs, we provide monitoring of missing 
report
-        List<VMInstanceVO> vmsThatAreMissingReport = 
_instanceDao.findByHostInStates(hostId, VirtualMachine.State.Running,
-                VirtualMachine.State.Stopping, VirtualMachine.State.Starting);
-        java.util.Iterator<VMInstanceVO> it = 
vmsThatAreMissingReport.iterator();
-        while (it.hasNext()) {
-            VMInstanceVO instance = it.next();
-            if (translatedInfo.get(instance.getId()) != null)
-                it.remove();
-        }
-
+        List<VMInstanceVO> vmsThatAreMissingReport = 
_instanceDao.findByHostInStatesExcluding(hostId, vmIds,
+                VirtualMachine.State.Running, VirtualMachine.State.Stopping, 
VirtualMachine.State.Starting);
         // here we need to be wary of out of band migration as opposed to 
other, more unexpected state changes
-        if (vmsThatAreMissingReport.size() > 0) {
-            Date currentTime = DateUtil.currentGMTTime();
-            logger.debug("Run missing VM report. current time: {}", 
currentTime.getTime());
-
-            // 2 times of sync-update interval for graceful period
-            long milliSecondsGracefullPeriod = 
mgmtServiceConf.getPingInterval() * 2000L;
-
-            for (VMInstanceVO instance : vmsThatAreMissingReport) {
-
-                // Make sure powerState is up to date for missing VMs
-                try {
-                    if (!force && 
!_instanceDao.isPowerStateUpToDate(instance.getId())) {
-                        logger.warn("Detected missing VM but power state is 
outdated, wait for another process report run for VM id: {}.", 
instance.getId());
-                        
_instanceDao.resetVmPowerStateTracking(instance.getId());
-                        continue;
-                    }
-                } catch (CloudRuntimeException e) {
-                    logger.warn("Checked for missing powerstate of a none 
existing vm", e);
-                    continue;
-                }
+        if (vmsThatAreMissingReport.isEmpty()) {
+            return;
+        }
+        Date currentTime = DateUtil.currentGMTTime();
+        if (logger.isDebugEnabled()) {
+            logger.debug("Run missing VM report. current time: " + 
currentTime.getTime());
+        }
+        if (!force) {
+            List<Long> outdatedVms = vmsThatAreMissingReport.stream()
+                    .filter(v -> !_instanceDao.isPowerStateUpToDate(v))
+                    .map(VMInstanceVO::getId)
+                    .collect(Collectors.toList());
+            _instanceDao.resetVmPowerStateTracking(outdatedVms);
+            vmsThatAreMissingReport = vmsThatAreMissingReport.stream()
+                    .filter(v -> !outdatedVms.contains(v.getId()))
+                    .collect(Collectors.toList());
+        }
 
-                Date vmStateUpdateTime = instance.getPowerStateUpdateTime();
+        // 2 times of sync-update interval for graceful period
+        long milliSecondsGracefulPeriod = mgmtServiceConf.getPingInterval() * 
2000L;
+        Map<Long, VirtualMachine.PowerState> instancePowerStates = new 
HashMap<>();
+        for (VMInstanceVO instance : vmsThatAreMissingReport) {
+            Date vmStateUpdateTime = instance.getPowerStateUpdateTime();
+            if (vmStateUpdateTime == null) {
+                logger.warn("VM power state update time is null, falling back 
to update time for vm id: " + instance.getId());
+                vmStateUpdateTime = instance.getUpdateTime();
                 if (vmStateUpdateTime == null) {
-                    logger.warn("VM power state update time is null, falling 
back to update time for vm id: {}.", instance.getId());
-                    vmStateUpdateTime = instance.getUpdateTime();
-                    if (vmStateUpdateTime == null) {
-                        logger.warn("VM update time is null, falling back to 
creation time for vm id: {}", instance.getId());
-                        vmStateUpdateTime = instance.getCreated();
-                    }
+                    logger.warn("VM update time is null, falling back to 
creation time for vm id: " + instance.getId());

Review Comment:
   ```suggestion
                       logger.warn("VM update time is null, falling back to 
creation time for vm id: {}", instance.getId());
   ```



##########
engine/orchestration/src/main/java/com/cloud/vm/VirtualMachineManagerImpl.java:
##########
@@ -4955,19 +4965,36 @@ private void 
handlePowerOffReportWithNoPendingJobsOnVM(final VMInstanceVO vm) {
     }
 
     private void scanStalledVMInTransitionStateOnUpHost(final long hostId) {
-        final long stallThresholdInMs = VmJobStateReportInterval.value() + 
(VmJobStateReportInterval.value() >> 1);
-        final Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - 
stallThresholdInMs);
-        final List<Long> mostlikelyStoppedVMs = 
listStalledVMInTransitionStateOnUpHost(hostId, cutTime);
-        for (final Long vmId : mostlikelyStoppedVMs) {
-            final VMInstanceVO vm = _vmDao.findById(vmId);
-            assert vm != null;
+        if (!syncTransitioningVmPowerState) {
+            return;
+        }
+        // Check VM that is stuck in Starting, Stopping, Migrating states, we 
won't check
+        // VMs in expunging state (this need to be handled specially)
+        //
+        // checking condition
+        //      1) no pending VmWork job
+        //      2) on hostId host and host is UP
+        //
+        // When host is UP, sooner or later we will get a report from the host 
about the VM,
+        // however, if VM is missing from the host report (it may happen in 
out of band changes
+        // or from behaviour of XS/KVM by design), the VM may not get a chance 
to run the state-sync logic
+        //
+        // Therefore, we will scan those VMs on UP host based on last update 
timestamp, if the host is UP
+        // and a VM stalls for status update, we will consider them to be 
powered off
+        // (which is relatively safe to do so)

Review Comment:
   Could you change this to a Javadoc? This explanation is too important to be 
left out.



##########
server/src/main/java/com/cloud/capacity/CapacityManagerImpl.java:
##########
@@ -407,23 +415,16 @@ public void 
doInTransactionWithoutResult(TransactionStatus status) {
     }
 
     @Override
-    public boolean checkIfHostHasCpuCapability(long hostId, Integer cpuNum, 
Integer cpuSpeed) {
-
+    public boolean checkIfHostHasCpuCapability(Host host, Integer cpuNum, 
Integer cpuSpeed) {
         // Check host can support the Cpu Number and Speed.
-        Host host = _hostDao.findById(hostId);
         boolean isCpuNumGood = host.getCpus().intValue() >= cpuNum;
         boolean isCpuSpeedGood = host.getSpeed().intValue() >= cpuSpeed;
         if (isCpuNumGood && isCpuSpeedGood) {
-            if (logger.isDebugEnabled()) {
-                logger.debug("Host: " + hostId + " has cpu capability (cpu:" + 
host.getCpus() + ", speed:" + host.getSpeed() +
-                    ") to support requested CPU: " + cpuNum + " and requested 
speed: " + cpuSpeed);
-            }
+            logger.debug("{} has cpu capability (cpu: {}, speed: {} ) to 
support requested CPU: {} and requested speed: {}",
+                    host, host.getCpus(), host.getSpeed(), cpuNum, cpuSpeed);
             return true;
-        } else {
-            if (logger.isDebugEnabled()) {
-                logger.debug("Host: " + hostId + " doesn't have cpu capability 
(cpu:" + host.getCpus() + ", speed:" + host.getSpeed() +
-                    ") to support requested CPU: " + cpuNum + " and requested 
speed: " + cpuSpeed);
-            }
+        } else {logger.debug("{} doesn't have cpu capability (cpu: {}, speed: 
{} ) to support requested CPU: {} and requested speed: {}",
+                host, host.getCpus(), host.getSpeed(), cpuNum, cpuSpeed);
             return false;
         }
     }

Review Comment:
   I'm not able to make a suggestion on deleted lines, but you could simplify 
this method as below:
   ```
       public boolean checkIfHostHasCpuCapability(Host host, Integer cpuNum, 
Integer cpuSpeed) {
           // Check host can support the Cpu Number and Speed.
           boolean isCpuNumGood = host.getCpus().intValue() >= cpuNum;
           boolean isCpuSpeedGood = host.getSpeed().intValue() >= cpuSpeed;
           boolean hasCpuCapability = isCpuNumGood && isCpuSpeedGood;
   
           logger.debug("{} {} cpu capability (cpu: {}, speed: {} ) to support 
requested CPU: {} and requested speed: {}",
                   host, hasCpuCapability ? "has" : "doesn't have" 
,host.getCpus(), host.getSpeed(), cpuNum, cpuSpeed);
   
           return hasCpuCapability;
       }
   ```



##########
utils/src/main/java/com/cloud/utils/nio/NioConnection.java:
##########
@@ -202,57 +237,64 @@ protected void accept(final SelectionKey key) throws 
IOException {
             logger.trace("Connection accepted for " + socket);
         }
 
-        final SSLEngine sslEngine;
         try {
-            sslEngine = Link.initServerSSLEngine(caService, 
socketChannel.getRemoteAddress().toString());
-            sslEngine.setUseClientMode(false);
-            
sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols()));
             final NioConnection nioConnection = this;
-            _sslHandshakeExecutor.submit(new Runnable() {
-                @Override
-                public void run() {
-                    _selector.wakeup();
-                    try {
-                        sslEngine.beginHandshake();
-                        if (!Link.doHandshake(socketChannel, sslEngine)) {
-                            throw new IOException("SSL handshake timed out 
with " + socketChannel.getRemoteAddress());
-                        }
-                        if (logger.isTraceEnabled()) {
-                            logger.trace("SSL: Handshake done");
-                        }
-                        final InetSocketAddress saddr = 
(InetSocketAddress)socket.getRemoteSocketAddress();
-                        final Link link = new Link(saddr, nioConnection);
-                        link.setSSLEngine(sslEngine);
-                        link.setKey(socketChannel.register(key.selector(), 
SelectionKey.OP_READ, link));
-                        final Task task = _factory.create(Task.Type.CONNECT, 
link, null);
-                        registerLink(saddr, link);
-                        _executor.submit(task);
-                    } catch (IOException e) {
-                        if (logger.isTraceEnabled()) {
-                            logger.trace("Connection closed due to failure: " 
+ e.getMessage());
-                        }
-                        closeAutoCloseable(socket, "accepting socket");
-                        closeAutoCloseable(socketChannel, "accepting 
socketChannel");
-                    } finally {
-                        _selector.wakeup();
+            _sslHandshakeExecutor.submit(() -> {
+                final InetSocketAddress socketAddress = 
(InetSocketAddress)socket.getRemoteSocketAddress();
+                activeAcceptConnections.incrementAndGet();
+                long startTime = System.currentTimeMillis();
+                _selector.wakeup();
+                try {
+                    final SSLEngine sslEngine = 
Link.initServerSSLEngine(caService, 
socketChannel.getRemoteAddress().toString());
+                    sslEngine.setUseClientMode(false);
+                    
sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols()));
+                    sslEngine.beginHandshake();
+                    if (!Link.doHandshake(socketChannel, sslEngine, 
getSslHandshakeTimeout())) {
+                        throw new IOException("SSL handshake timed out with " 
+ socketAddress);
+                    }
+                    if (logger.isTraceEnabled()) {
+                        logger.trace("SSL: Handshake done");
                     }

Review Comment:
   No need for a guard here, but if you really want to, you could use lazy 
logging.
   ```suggestion
                       logger.trace("SSL: Handshake done");
   ```



##########
utils/src/main/java/com/cloud/utils/nio/NioConnection.java:
##########
@@ -190,9 +209,25 @@ public Boolean call() throws NioConnectionException {
 
     abstract void unregisterLink(InetSocketAddress saddr);
 
+    protected boolean rejectConnectionIfBusy(final SocketChannel 
socketChannel) throws IOException {
+        if (activeAcceptConnections.get() < sslHandshakeMaxWorkers) {
+            return false;
+        }
+        // Reject new connection if the server is busy
+        logger.warn(String.format("%s Rejecting new connection. %d active 
connections currently",
+                SERVER_BUSY_MESSAGE, sslHandshakeMaxWorkers));

Review Comment:
   ```suggestion
           logger.warn("{} Rejecting new connection. {} active connections 
currently", SERVER_BUSY_MESSAGE, sslHandshakeMaxWorkers);
   ```



##########
utils/src/main/java/com/cloud/utils/nio/NioClient.java:
##########
@@ -33,73 +33,84 @@
 
 public class NioClient extends NioConnection {
 
-    protected String _host;
-    protected SocketChannel _clientConnection;
+    protected String host;
+    protected SocketChannel clientConnection;
 
-    public NioClient(final String name, final String host, final int port, 
final int workers, final HandlerFactory factory) {
-        super(name, port, workers, factory);
-        _host = host;
+    public NioClient(final String name, final String host, final int port, 
final int workers, final Integer sslHandshakeTimeout, final HandlerFactory 
factory) {
+        super(name, port, workers, 1, 2, factory);
+        setSslHandshakeTimeout(sslHandshakeTimeout);
+        this.host = host;
+    }
+
+    protected void closeChannel() {
+        try {
+            if (clientConnection != null && clientConnection.isOpen()) {
+                clientConnection.close();
+            }
+        } catch (IOException e) {
+            logger.error("Failed to close SocketChannel", e);
+        }
     }
 
     @Override
     protected void init() throws IOException {
-        _selector = Selector.open();
-        Task task = null;
-
+        Task task;
+        String hostLog = host + ":" + _port;
         try {
-            _clientConnection = SocketChannel.open();
-
-            logger.info("Connecting to " + _host + ":" + _port);
-            final InetSocketAddress peerAddr = new InetSocketAddress(_host, 
_port);
-            _clientConnection.connect(peerAddr);
-            _clientConnection.configureBlocking(false);
+            logger.info("Connecting to {}", hostLog);
+            _selector = Selector.open();
+            clientConnection = SocketChannel.open();
+            final InetSocketAddress serverAddress = new 
InetSocketAddress(host, _port);
+            clientConnection.connect(serverAddress);
+            logger.info("Connected to {}", hostLog);
+            clientConnection.configureBlocking(false);
 
             final SSLContext sslContext = Link.initClientSSLContext();
-            SSLEngine sslEngine = sslContext.createSSLEngine(_host, _port);
+            SSLEngine sslEngine = sslContext.createSSLEngine(host, _port);
             sslEngine.setUseClientMode(true);
             
sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols()));
             sslEngine.beginHandshake();
-            if (!Link.doHandshake(_clientConnection, sslEngine)) {
-                logger.error("SSL Handshake failed while connecting to host: " 
+ _host + " port: " + _port);
-                _selector.close();
-                throw new IOException("SSL Handshake failed while connecting 
to host: " + _host + " port: " + _port);
+            if (!Link.doHandshake(clientConnection, sslEngine, 
getSslHandshakeTimeout())) {
+                throw new IOException(String.format("SSL Handshake failed 
while connecting to host: %s", hostLog));
             }
             logger.info("SSL: Handshake done");
-            logger.info("Connected to " + _host + ":" + _port);
 
-            final Link link = new Link(peerAddr, this);
+            final Link link = new Link(serverAddress, this);
             link.setSSLEngine(sslEngine);
-            final SelectionKey key = _clientConnection.register(_selector, 
SelectionKey.OP_READ);
+            final SelectionKey key = clientConnection.register(_selector, 
SelectionKey.OP_READ);
             link.setKey(key);
             key.attach(link);
             // Notice we've already connected due to the handshake, so let's 
get the
             // remaining task done
             task = _factory.create(Task.Type.CONNECT, link, null);
         } catch (final GeneralSecurityException e) {
-            _selector.close();
+            closeChannel();
             throw new IOException("Failed to initialise security", e);
         } catch (final IOException e) {
-            _selector.close();
+            closeChannel();
+            logger.error(String.format("IOException while connecting to %s", 
hostLog), e);

Review Comment:
   ```suggestion
               logger.error("IOException while connecting to {}", hostLog, e);
   ```



##########
utils/src/main/java/com/cloud/utils/nio/NioConnection.java:
##########
@@ -202,57 +237,64 @@ protected void accept(final SelectionKey key) throws 
IOException {
             logger.trace("Connection accepted for " + socket);
         }
 
-        final SSLEngine sslEngine;
         try {
-            sslEngine = Link.initServerSSLEngine(caService, 
socketChannel.getRemoteAddress().toString());
-            sslEngine.setUseClientMode(false);
-            
sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols()));
             final NioConnection nioConnection = this;
-            _sslHandshakeExecutor.submit(new Runnable() {
-                @Override
-                public void run() {
-                    _selector.wakeup();
-                    try {
-                        sslEngine.beginHandshake();
-                        if (!Link.doHandshake(socketChannel, sslEngine)) {
-                            throw new IOException("SSL handshake timed out 
with " + socketChannel.getRemoteAddress());
-                        }
-                        if (logger.isTraceEnabled()) {
-                            logger.trace("SSL: Handshake done");
-                        }
-                        final InetSocketAddress saddr = 
(InetSocketAddress)socket.getRemoteSocketAddress();
-                        final Link link = new Link(saddr, nioConnection);
-                        link.setSSLEngine(sslEngine);
-                        link.setKey(socketChannel.register(key.selector(), 
SelectionKey.OP_READ, link));
-                        final Task task = _factory.create(Task.Type.CONNECT, 
link, null);
-                        registerLink(saddr, link);
-                        _executor.submit(task);
-                    } catch (IOException e) {
-                        if (logger.isTraceEnabled()) {
-                            logger.trace("Connection closed due to failure: " 
+ e.getMessage());
-                        }
-                        closeAutoCloseable(socket, "accepting socket");
-                        closeAutoCloseable(socketChannel, "accepting 
socketChannel");
-                    } finally {
-                        _selector.wakeup();
+            _sslHandshakeExecutor.submit(() -> {
+                final InetSocketAddress socketAddress = 
(InetSocketAddress)socket.getRemoteSocketAddress();
+                activeAcceptConnections.incrementAndGet();
+                long startTime = System.currentTimeMillis();
+                _selector.wakeup();
+                try {
+                    final SSLEngine sslEngine = 
Link.initServerSSLEngine(caService, 
socketChannel.getRemoteAddress().toString());
+                    sslEngine.setUseClientMode(false);
+                    
sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols()));
+                    sslEngine.beginHandshake();
+                    if (!Link.doHandshake(socketChannel, sslEngine, 
getSslHandshakeTimeout())) {
+                        throw new IOException("SSL handshake timed out with " 
+ socketAddress);
+                    }
+                    if (logger.isTraceEnabled()) {
+                        logger.trace("SSL: Handshake done");
                     }
+                    final Link link = new Link(socketAddress, nioConnection);
+                    link.setSSLEngine(sslEngine);
+                    link.setKey(socketChannel.register(key.selector(), 
SelectionKey.OP_READ, link));
+                    final Task task = _factory.create(Task.Type.CONNECT, link, 
null);
+                    registerLink(socketAddress, link);
+                    _executor.submit(task);
+                } catch (final GeneralSecurityException | IOException e) {
+                    if (logger.isTraceEnabled()) {
+                        logger.trace(socket.getRemoteSocketAddress()+ 
"Connection closed due to failure: " + e.getMessage());
+                    }

Review Comment:
   ```suggestion
                       logger.trace("{} Connection closed due to failure: {}", 
socket::getRemoteSocketAddress, e::getMessage);
   ```



##########
engine/schema/src/main/java/com/cloud/upgrade/dao/DatabaseAccessObject.java:
##########
@@ -114,6 +114,17 @@ public void createIndex(Connection conn, String tableName, 
String indexName, Str
         }
     }
 
+    public void renameIndex(Connection conn, String tableName, String oldName, 
String newName) {
+        String stmt = String.format("ALTER TABLE %s RENAME INDEX %s TO %s", 
tableName, oldName, newName);

Review Comment:
   Couldn't you use the `cloud`.`IDEMPOTENT_CHANGE_COLUMN` procedure here? Or 
maybe, create a new procedure to use here.



##########
agent/src/main/java/com/cloud/agent/Agent.java:
##########
@@ -125,128 +128,134 @@ public int value() {
         }
     }
 
-    List<IAgentControlListener> _controlListeners = new 
ArrayList<IAgentControlListener>();
+    CopyOnWriteArrayList<IAgentControlListener> controlListeners = new 
CopyOnWriteArrayList<>();
 
-    IAgentShell _shell;
-    NioConnection _connection;
-    ServerResource _resource;
-    Link _link;
-    Long _id;
+    IAgentShell shell;
+    NioConnection connection;
+    ServerResource serverResource;
+    Link link;
+    Long id;
 
-    Timer _timer = new Timer("Agent Timer");
-    Timer certTimer;
-    Timer hostLBTimer;
+    ScheduledExecutorService selfTaskExecutor;
+    ScheduledExecutorService certExecutor;
+    ScheduledExecutorService hostLbCheckExecutor;
 
-    List<WatchTask> _watchList = new ArrayList<WatchTask>();
-    long _sequence = 0;
-    long _lastPingResponseTime = 0;
-    long _pingInterval = 0;
-    AtomicInteger _inProgress = new AtomicInteger();
+    CopyOnWriteArrayList<ScheduledFuture<?>> watchList = new 
CopyOnWriteArrayList<>();
+    AtomicLong sequence = new AtomicLong(0);
+    AtomicLong lastPingResponseTime = new AtomicLong(0L);
+    long pingInterval = 0;
+    AtomicInteger commandsInProgress = new AtomicInteger(0);
 
-    StartupTask _startup = null;
-    long _startupWaitDefault = 180000;
-    long _startupWait = _startupWaitDefault;
-    boolean _reconnectAllowed = true;
-    //For time sentitive task, e.g. PingTask
-    ThreadPoolExecutor _ugentTaskPool;
-    ExecutorService _executor;
+    private final AtomicReference<StartupTask> startupTask = new 
AtomicReference<>();
+    private static final long DEFAULT_STARTUP_WAIT = 180;
+    long startupWait = DEFAULT_STARTUP_WAIT;
+    boolean reconnectAllowed = true;
 
-    Thread _shutdownThread = new ShutdownThread(this);
+    //For time sensitive task, e.g. PingTask
+    ThreadPoolExecutor outRequestHandler;
+    ExecutorService requestHandler;
 
-    private String _keystoreSetupPath;
-    private String _keystoreCertImportPath;
+    Thread shutdownThread = new ShutdownThread(this);
 
-    // for simulator use only
-    public Agent(final IAgentShell shell) {
-        _shell = shell;
-        _link = null;
+    private String keystoreSetupSetupPath;
+    private String keystoreCertImportScriptPath;
 
-        _connection = new NioClient("Agent", _shell.getNextHost(), 
_shell.getPort(), _shell.getWorkers(), this);
+    private String hostname;
 
-        Runtime.getRuntime().addShutdownHook(_shutdownThread);
+    protected String getLinkLog(final Link link) {
+        if (link == null) {
+            return "";
+        }
+        StringBuilder str = new StringBuilder();
+        if (logger.isTraceEnabled()) {
+            str.append(System.identityHashCode(link)).append("-");
+        }
+        str.append(link.getSocketAddress());
+        return str.toString();
+    }
 
-        _ugentTaskPool =
-                new ThreadPoolExecutor(shell.getPingRetries(), 2 * 
shell.getPingRetries(), 10, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(), 
new NamedThreadFactory(
-                        "UgentTask"));
+    protected String getAgentName() {
+        return (serverResource != null && 
serverResource.isAppendAgentNameToLogs() &&
+                StringUtils.isNotBlank(serverResource.getName())) ?
+                serverResource.getName() :
+                "Agent";
+    }
 
-        _executor =
-                new ThreadPoolExecutor(_shell.getWorkers(), 5 * 
_shell.getWorkers(), 1, TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>(), new 
NamedThreadFactory(
-                        "agentRequest-Handler"));
+    protected void setupShutdownHookAndInitExecutors() {
+        if (logger.isTraceEnabled()) {
+            logger.trace("Adding shutdown hook");
+        }
+        Runtime.getRuntime().addShutdownHook(shutdownThread);
+        selfTaskExecutor = Executors.newScheduledThreadPool(1, new 
NamedThreadFactory("Agent-SelfTask"));
+        outRequestHandler = new ThreadPoolExecutor(shell.getPingRetries(), 2 * 
shell.getPingRetries(), 10, TimeUnit.MINUTES,
+                new SynchronousQueue<>(), new 
NamedThreadFactory("AgentOutRequest-Handler"));
+        requestHandler = new ThreadPoolExecutor(shell.getWorkers(), 5 * 
shell.getWorkers(), 1, TimeUnit.DAYS,
+                new LinkedBlockingQueue<>(), new 
NamedThreadFactory("AgentRequest-Handler"));
     }
 
-    public Agent(final IAgentShell shell, final int localAgentId, final 
ServerResource resource) throws ConfigurationException {
-        _shell = shell;
-        _resource = resource;
-        _link = null;
+    // for simulator use only
+    public Agent(final IAgentShell shell) {
+        this.shell = shell;
+        link = null;
+        connection = new NioClient(getAgentName(), this.shell.getNextHost(), 
this.shell.getPort(),
+                this.shell.getWorkers(), this.shell.getSslHandshakeTimeout(), 
this);
+        setupShutdownHookAndInitExecutors();
+    }
 
+    public Agent(final IAgentShell shell, final int localAgentId, final 
ServerResource resource) throws ConfigurationException {
+        this.shell = shell;
+        serverResource = resource;
+        link = null;
         resource.setAgentControl(this);
-
-        final String value = _shell.getPersistentProperty(getResourceName(), 
"id");
-        _id = value != null ? Long.parseLong(value) : null;
-        logger.info("id is {}", ObjectUtils.defaultIfNull(_id, ""));
-
+        final String value = 
this.shell.getPersistentProperty(getResourceName(), "id");
+        id = value != null ? Long.parseLong(value) : null;
+        logger.info("id is {}", (id != null ? id : ""));
         final Map<String, Object> params = new HashMap<>();
-
         // merge with properties from command line to let resource access 
command line parameters
-        for (final Map.Entry<String, Object> cmdLineProp : 
_shell.getCmdLineProperties().entrySet()) {
+        for (final Map.Entry<String, Object> cmdLineProp : 
this.shell.getCmdLineProperties().entrySet()) {
             params.put(cmdLineProp.getKey(), cmdLineProp.getValue());
         }
-
-        if (!_resource.configure(getResourceName(), params)) {
-            throw new ConfigurationException("Unable to configure " + 
_resource.getName());
-        }
-
-        final String host = _shell.getNextHost();
-        _connection = new NioClient("Agent", host, _shell.getPort(), 
_shell.getWorkers(), this);
-
-        // ((NioClient)_connection).setBindAddress(_shell.getPrivateIp());
-
-        logger.debug("Adding shutdown hook");
-        Runtime.getRuntime().addShutdownHook(_shutdownThread);
-
-        _ugentTaskPool =
-                new ThreadPoolExecutor(shell.getPingRetries(), 2 * 
shell.getPingRetries(), 10, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(), 
new NamedThreadFactory(
-                        "UgentTask"));
-
-        _executor =
-                new ThreadPoolExecutor(_shell.getWorkers(), 5 * 
_shell.getWorkers(), 1, TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>(), new 
NamedThreadFactory(
-                        "agentRequest-Handler"));
-
-        logger.info("Agent [id = {} : type = {} : zone = {} : pod = {} : 
workers = {} : host = {} : port = {}", ObjectUtils.defaultIfNull(_id, "new"), 
getResourceName(),
-                 _shell.getZone(), _shell.getPod(), _shell.getWorkers(), host, 
_shell.getPort());
+        if (!serverResource.configure(getResourceName(), params)) {
+            throw new ConfigurationException("Unable to configure " + 
serverResource.getName());
+        }
+        ThreadContext.put("agentname", getAgentName());
+        final String host = this.shell.getNextHost();
+        connection = new NioClient(getAgentName(), host, this.shell.getPort(), 
this.shell.getWorkers(),
+                this.shell.getSslHandshakeTimeout(), this);
+        setupShutdownHookAndInitExecutors();
+        logger.info("Agent [id = {}, type = {}, zone = {}, pod = {}, workers = 
{}, host = {}, " +
+                        "port = {}, local id = {}]",
+                (id != null ? String.valueOf(id) : "new"), getResourceName(), 
this.shell.getZone(),
+                this.shell.getPod(), this.shell.getWorkers(), host, 
this.shell.getPort(), localAgentId);

Review Comment:
   You could create a toString method for the `Agent` class here, it should 
make this message easier.



##########
utils/src/main/java/com/cloud/utils/nio/NioConnection.java:
##########
@@ -202,57 +237,64 @@ protected void accept(final SelectionKey key) throws 
IOException {
             logger.trace("Connection accepted for " + socket);
         }
 
-        final SSLEngine sslEngine;
         try {
-            sslEngine = Link.initServerSSLEngine(caService, 
socketChannel.getRemoteAddress().toString());
-            sslEngine.setUseClientMode(false);
-            
sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols()));
             final NioConnection nioConnection = this;
-            _sslHandshakeExecutor.submit(new Runnable() {
-                @Override
-                public void run() {
-                    _selector.wakeup();
-                    try {
-                        sslEngine.beginHandshake();
-                        if (!Link.doHandshake(socketChannel, sslEngine)) {
-                            throw new IOException("SSL handshake timed out 
with " + socketChannel.getRemoteAddress());
-                        }
-                        if (logger.isTraceEnabled()) {
-                            logger.trace("SSL: Handshake done");
-                        }
-                        final InetSocketAddress saddr = 
(InetSocketAddress)socket.getRemoteSocketAddress();
-                        final Link link = new Link(saddr, nioConnection);
-                        link.setSSLEngine(sslEngine);
-                        link.setKey(socketChannel.register(key.selector(), 
SelectionKey.OP_READ, link));
-                        final Task task = _factory.create(Task.Type.CONNECT, 
link, null);
-                        registerLink(saddr, link);
-                        _executor.submit(task);
-                    } catch (IOException e) {
-                        if (logger.isTraceEnabled()) {
-                            logger.trace("Connection closed due to failure: " 
+ e.getMessage());
-                        }
-                        closeAutoCloseable(socket, "accepting socket");
-                        closeAutoCloseable(socketChannel, "accepting 
socketChannel");
-                    } finally {
-                        _selector.wakeup();
+            _sslHandshakeExecutor.submit(() -> {
+                final InetSocketAddress socketAddress = 
(InetSocketAddress)socket.getRemoteSocketAddress();
+                activeAcceptConnections.incrementAndGet();
+                long startTime = System.currentTimeMillis();
+                _selector.wakeup();
+                try {
+                    final SSLEngine sslEngine = 
Link.initServerSSLEngine(caService, 
socketChannel.getRemoteAddress().toString());
+                    sslEngine.setUseClientMode(false);
+                    
sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols()));
+                    sslEngine.beginHandshake();
+                    if (!Link.doHandshake(socketChannel, sslEngine, 
getSslHandshakeTimeout())) {
+                        throw new IOException("SSL handshake timed out with " 
+ socketAddress);
+                    }
+                    if (logger.isTraceEnabled()) {
+                        logger.trace("SSL: Handshake done");
                     }
+                    final Link link = new Link(socketAddress, nioConnection);
+                    link.setSSLEngine(sslEngine);
+                    link.setKey(socketChannel.register(key.selector(), 
SelectionKey.OP_READ, link));
+                    final Task task = _factory.create(Task.Type.CONNECT, link, 
null);
+                    registerLink(socketAddress, link);
+                    _executor.submit(task);
+                } catch (final GeneralSecurityException | IOException e) {
+                    if (logger.isTraceEnabled()) {
+                        logger.trace(socket.getRemoteSocketAddress()+ 
"Connection closed due to failure: " + e.getMessage());
+                    }
+                    closeAutoCloseable(socket, "accepting socket");
+                    closeAutoCloseable(socketChannel, "accepting 
socketChannel");
+                } finally {
+                    int connections = 
activeAcceptConnections.decrementAndGet();
+                    if (logger.isTraceEnabled()) {
+                        logger.trace(String.format("Accept task complete for 
%s - time taken: %d, " +
+                                        "active accept connections: %d",
+                                socketAddress, (System.currentTimeMillis() - 
startTime), connections));
+                    }
+                    _selector.wakeup();
                 }
             });
-        } catch (final Exception e) {
+        } catch (final RejectedExecutionException e) {
             if (logger.isTraceEnabled()) {
-                logger.trace("Connection closed due to failure: " + 
e.getMessage());
+                logger.trace("{} Accept Task rejected: {}", 
socket.getRemoteSocketAddress(), e.getMessage());
             }
-            closeAutoCloseable(socket, "accepting socket");
-            closeAutoCloseable(socketChannel, "accepting socketChannel");
+            closeAutoCloseable(socket, "Rejecting connection - accepting 
socket");
+            closeAutoCloseable(socketChannel, "Rejecting connection - 
accepting socketChannel");
         } finally {
             _selector.wakeup();
         }
     }
 
-    protected void terminate(final SelectionKey key) {
+    protected void terminate(final SelectionKey key, String msg) {
         final Link link = (Link)key.attachment();
         closeConnection(key);
         if (link != null) {
+            if (logger.isTraceEnabled()) {
+                logger.warn("Will terminate connection due to: " + msg);
+            }

Review Comment:
   Same here, no need for a guard
   ```suggestion
               logger.warn("Will terminate connection due to: {}", msg);
   ```



##########
engine/orchestration/src/main/java/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java:
##########
@@ -67,119 +68,121 @@ public void processHostVmStatePingReport(long hostId, 
Map<String, HostVmStateRep
         processReport(hostId, translatedInfo, force);
     }
 
-    private void processReport(long hostId, Map<Long, 
VirtualMachine.PowerState> translatedInfo, boolean force) {
-
-        logger.debug("Process VM state report. host: {}, number of records in 
report: {}.", hostId, translatedInfo.size());
-
-        for (Map.Entry<Long, VirtualMachine.PowerState> entry : 
translatedInfo.entrySet()) {
-
-            logger.debug("VM state report. host: {}, vm id: {}, power state: 
{}.", hostId, entry.getKey(), entry.getValue());
-
-            if (_instanceDao.updatePowerState(entry.getKey(), hostId, 
entry.getValue(), DateUtil.currentGMTTime())) {
-                logger.debug("VM state report is updated. host: {}, vm id: {}, 
power state: {}.", hostId, entry.getKey(), entry.getValue());
-
-                _messageBus.publish(null, 
VirtualMachineManager.Topics.VM_POWER_STATE, PublishScope.GLOBAL, 
entry.getKey());
-            } else {
-                logger.trace("VM power state does not change, skip DB writing. 
vm id: {}.", entry.getKey());
+    private void updateAndPublishVmPowerStates(long hostId, Map<Long, 
VirtualMachine.PowerState> instancePowerStates,
+           Date updateTime) {
+        if (instancePowerStates.isEmpty()) {
+            return;
+        }
+        Set<Long> vmIds = instancePowerStates.keySet();
+        Map<Long, VirtualMachine.PowerState> notUpdated = 
_instanceDao.updatePowerState(instancePowerStates, hostId,
+                updateTime);
+        if (notUpdated.size() <= vmIds.size()) {
+            for (Long vmId : vmIds) {
+                if (!notUpdated.isEmpty() && !notUpdated.containsKey(vmId)) {
+                    logger.debug("VM state report is updated. host: {}, vm id: 
{}}, power state: {}}",
+                            hostId, vmId, instancePowerStates.get(vmId));
+                    _messageBus.publish(null, 
VirtualMachineManager.Topics.VM_POWER_STATE,
+                            PublishScope.GLOBAL, vmId);
+                    continue;
+                }
+                logger.trace("VM power state does not change, skip DB writing. 
vm id: {}", vmId);
             }
         }

Review Comment:
   Could invert the `if` and reduce indentation.
   ```suggestion
           Map<Long, VirtualMachine.PowerState> notUpdated = 
_instanceDao.updatePowerState(instancePowerStates, hostId, updateTime);
   
           if (notUpdated.size() > vmIds.size()) {
               return;
           }
   
           for (Long vmId : vmIds) {
               if (!notUpdated.isEmpty() && !notUpdated.containsKey(vmId)) {
                   logger.debug("VM state report is updated. host: {}, vm id: 
{}}, power state: {}}", hostId, vmId, instancePowerStates.get(vmId));
                   _messageBus.publish(null, 
VirtualMachineManager.Topics.VM_POWER_STATE, PublishScope.GLOBAL, vmId);
                   continue;
               }
               logger.trace("VM power state does not change, skip DB writing. 
vm id: {}", vmId);
           }
   ```



##########
agent/src/main/java/com/cloud/agent/Agent.java:
##########
@@ -1064,34 +1104,33 @@ protected void runInContext() {
         }
     }
 
-    public class StartupTask extends ManagedContextTimerTask {
+    public class StartupTask implements Runnable {
         protected Link _link;
-        protected volatile boolean cancelled = false;
+        private final AtomicBoolean cancelled = new AtomicBoolean(false);
 
         public StartupTask(final Link link) {
             logger.debug("Startup task created");
             _link = link;
         }
 
-        @Override
-        public synchronized boolean cancel() {
+        public boolean cancel() {
             // TimerTask.cancel may fail depends on the calling context
-            if (!cancelled) {
-                cancelled = true;
-                _startupWait = _startupWaitDefault;
+            if (cancelled.compareAndSet(false, true)) {
+                startupWait = DEFAULT_STARTUP_WAIT;
                 logger.debug("Startup task cancelled");
-                return super.cancel();
             }
             return true;
         }
 
         @Override
-        protected synchronized void runInContext() {
-            if (!cancelled) {
-                logger.info("The startup command is now cancelled");
-                cancelled = true;
-                _startup = null;
-                _startupWait = _startupWaitDefault * 2;
+        public void run() {
+            if (cancelled.compareAndSet(false, true)) {
+                logger.info("The running startup command is now invalid. 
Attempting reconnect");
+                startupTask.set(null);
+                startupWait = DEFAULT_STARTUP_WAIT * 2;
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Executing reconnect from task - {}", 
getLinkLog(_link));
+                }

Review Comment:
   ```suggestion
                   logger.debug("Executing reconnect from task - {}", () -> 
getLinkLog(_link));
   ```



##########
agent/src/main/java/com/cloud/agent/Agent.java:
##########
@@ -518,91 +568,88 @@ public Task create(final Task.Type type, final Link link, 
final byte[] data) {
         return new ServerHandler(type, link, data);
     }
 
-    protected void reconnect(final Link link) {
-        if (!_reconnectAllowed) {
+    protected void closeAndTerminateLink(final Link link) {
+        if (link == null) {
             return;
         }
-        synchronized (this) {
-            if (_startup != null) {
-                _startup.cancel();
-                _startup = null;
-            }
-        }
+        link.close();
+        link.terminated();
+    }
 
-        if (link != null) {
-            link.close();
-            link.terminated();
+    protected void stopAndCleanupConnection(boolean waitForStop) {
+        if (connection == null) {
+            return;
         }
-
-        setLink(null);
-        cancelTasks();
-
-        _resource.disconnected();
-
-        logger.info("Lost connection to host: {}. Attempting reconnection 
while we still have {} commands in progress.", _shell.getConnectedHost(), 
_inProgress.get());
-
-        _connection.stop();
-
+        connection.stop();
         try {
-            _connection.cleanUp();
+            connection.cleanUp();
         } catch (final IOException e) {
             logger.warn("Fail to clean up old connection. {}", e);
         }
-
-        while (_connection.isStartup()) {
-            _shell.getBackoffAlgorithm().waitBeforeRetry();
+        if (!waitForStop) {
+            return;
         }
+        do {
+            shell.getBackoffAlgorithm().waitBeforeRetry();
+        } while (connection.isStartup());
+    }
 
+    protected void reconnect(final Link link) {
+        if (!reconnectAllowed) {
+            logger.debug("Reconnect requested but it is not allowed {}", 
getLinkLog(link));

Review Comment:
   ```suggestion
               logger.debug("Reconnect requested but it is not allowed {}", () 
-> getLinkLog(link));
   ```



##########
server/src/main/java/com/cloud/deploy/DeploymentPlanningManagerImpl.java:
##########
@@ -742,12 +741,11 @@ protected boolean 
isAdminVmDeployableInDisabledResources() {
      * Adds disabled Hosts to the ExcludeList in order to avoid them at the 
deployment planner.
      */
     protected void avoidDisabledHosts(DataCenter dc, ExcludeList avoids) {
-        List<HostVO> disabledHosts = 
_hostDao.listDisabledByDataCenterId(dc.getId());
-        logger.debug(() -> String.format("Adding hosts [%s] of datacenter [%s] 
to the avoid set, because these hosts are in the Disabled state.",
-                
disabledHosts.stream().map(HostVO::getUuid).collect(Collectors.joining(", ")), 
dc.getUuid()));
-        for (HostVO host : disabledHosts) {
-            avoids.addHost(host.getId());
-        }
+
+        List<Long> disabledHostIds = 
_hostDao.listDisabledIdsByDataCenterId(dc.getId());
+        logger.debug("Adding hosts %s of datacenter [%s] to the avoid set, 
because these hosts are in the Disabled state.",
+                StringUtils.join(disabledHostIds), dc.getUuid());

Review Comment:
   ```suggestion
           logger.debug("Adding hosts {} of datacenter [{}] to the avoid set, 
because these hosts are in the Disabled state.", 
StringUtils.join(disabledHostIds), dc.getUuid());
   ```



##########
agent/src/main/java/com/cloud/agent/Agent.java:
##########
@@ -1174,34 +1207,32 @@ public PostCertificateRenewalTask(final Agent agent) {
         protected void runInContext() {
             while (true) {
                 try {
-                    if (_inProgress.get() == 0) {
+                    if (commandsInProgress.get() == 0) {
                         logger.debug("Running post certificate renewal task to 
restart services.");
 
                         // Let the resource perform any post certificate 
renewal cleanups
-                        _resource.executeRequest(new 
PostCertificateRenewalCommand());
+                        serverResource.executeRequest(new 
PostCertificateRenewalCommand());
 
-                        IAgentShell shell = agent._shell;
-                        ServerResource resource = 
agent._resource.getClass().newInstance();
+                        IAgentShell shell = agent.shell;
+                        ServerResource resource = 
agent.serverResource.getClass().getDeclaredConstructor().newInstance();
 
                         // Stop current agent
                         agent.cancelTasks();
-                        agent._reconnectAllowed = false;
-                        
Runtime.getRuntime().removeShutdownHook(agent._shutdownThread);
+                        agent.reconnectAllowed = false;
+                        
Runtime.getRuntime().removeShutdownHook(agent.shutdownThread);
                         agent.stop(ShutdownCommand.Requested, "Restarting due 
to new X509 certificates");
 
                         // Nullify references for GC
-                        agent._shell = null;
-                        agent._watchList = null;
-                        agent._shutdownThread = null;
-                        agent._controlListeners = null;
+                        agent.watchList = null;
+                        agent.shutdownThread = null;
+                        agent.controlListeners = null;

Review Comment:
   Should `shell` be nullified here?



##########
framework/db/src/main/java/com/cloud/utils/db/GenericDaoBase.java:
##########
@@ -1214,6 +1214,35 @@ public List<T> listAll(final Filter filter) {
         return executeList(sql.toString());
     }
 
+    private Object getIdObject() {
+        T entity = (T)_searchEnhancer.create();
+        try {
+            Method m = _entityBeanType.getMethod("getId");
+            return m.invoke(entity);
+        } catch (NoSuchMethodException | InvocationTargetException | 
IllegalAccessException ignored) {
+            logger.warn(String.format("Unable to get ID object for entity: 
%s", _entityBeanType.getSimpleName()));

Review Comment:
   ```suggestion
               logger.warn("Unable to get ID object for entity: {}", 
_entityBeanType.getSimpleName());
   ```



##########
server/src/main/java/com/cloud/hypervisor/kvm/discoverer/LibvirtServerDiscoverer.java:
##########
@@ -467,11 +484,9 @@ public HostVO createHostVOForConnectedAgent(HostVO host, 
StartupCommand[] cmd) {
             throw new IllegalArgumentException("cannot add host, due to can't 
find cluster: " + host.getClusterId());
         }
 
-        List<HostVO> hostsInCluster = 
_resourceMgr.listAllHostsInCluster(clusterVO.getId());
-        if (!hostsInCluster.isEmpty()) {
-            HostVO oneHost = hostsInCluster.get(0);
-            _hostDao.loadDetails(oneHost);
-            String hostOsInCluster = oneHost.getDetail("Host.OS");
+        HostVO existingHostInCluster = 
clusterExistingHostCache.get(clusterVO.getId());
+        if (existingHostInCluster != null) {
+            String hostOsInCluster = 
existingHostInCluster.getDetail("Host.OS");
             String hostOs = ssCmd.getHostDetails().get("Host.OS");

Review Comment:
   ```suggestion
               String hostOsInCluster = 
existingHostInCluster.getDetail(HostInfo.HOST_OS);
               String hostOs = ssCmd.getHostDetails().get(HostInfo.HOST_OS);
   ```



##########
engine/schema/src/main/java/com/cloud/vm/dao/VMInstanceDaoImpl.java:
##########
@@ -914,42 +946,109 @@ public List<VMInstanceVO> listStartingWithNoHostId() {
         return listBy(sc);
     }
 
+    protected List<VMInstanceVO> listSelectPowerStateByIds(final List<Long> 
ids) {
+        if (CollectionUtils.isEmpty(ids)) {
+            return new ArrayList<>();
+        }
+        SearchCriteria<VMInstanceVO> sc = IdsPowerStateSelectSearch.create();
+        sc.setParameters("id", ids.toArray());
+        return customSearch(sc, null);
+    }
+
+    protected Integer getPowerUpdateCount(final VMInstanceVO instance, final 
long powerHostId,
+              final VirtualMachine.PowerState powerState, Date wisdomEra) {
+        if (instance.getPowerStateUpdateTime() == null || 
instance.getPowerStateUpdateTime().before(wisdomEra)) {
+            Long savedPowerHostId = instance.getPowerHostId();
+            boolean isStateMismatch = instance.getPowerState() != powerState
+                    || savedPowerHostId == null
+                    || !savedPowerHostId.equals(powerHostId)
+                    || !isPowerStateInSyncWithInstanceState(powerState, 
powerHostId, instance);
+            if (isStateMismatch) {
+                return 1;
+            } else if (instance.getPowerStateUpdateCount() < 
MAX_CONSECUTIVE_SAME_STATE_UPDATE_COUNT) {
+                return instance.getPowerStateUpdateCount() + 1;
+            }
+        }
+        return null;
+    }
+
     @Override
-    public boolean updatePowerState(final long instanceId, final long 
powerHostId, final VirtualMachine.PowerState powerState, Date wisdomEra) {
-        return Transaction.execute(new TransactionCallback<>() {
-            @Override
-            public Boolean doInTransaction(TransactionStatus status) {
-                boolean needToUpdate = false;
-                VMInstanceVO instance = findById(instanceId);
-                if (instance != null
-                        && (null == instance.getPowerStateUpdateTime()
-                        || 
instance.getPowerStateUpdateTime().before(wisdomEra))) {
-                    Long savedPowerHostId = instance.getPowerHostId();
-                    if (instance.getPowerState() != powerState
-                            || savedPowerHostId == null
-                            || savedPowerHostId != powerHostId
-                            || 
!isPowerStateInSyncWithInstanceState(powerState, powerHostId, instance)) {
-                        instance.setPowerState(powerState);
-                        instance.setPowerHostId(powerHostId);
-                        instance.setPowerStateUpdateCount(1);
-                        
instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
-                        needToUpdate = true;
-                        update(instanceId, instance);
-                    } else {
-                        // to reduce DB updates, consecutive same state update 
for more than 3 times
-                        if (instance.getPowerStateUpdateCount() < 
MAX_CONSECUTIVE_SAME_STATE_UPDATE_COUNT) {
-                            
instance.setPowerStateUpdateCount(instance.getPowerStateUpdateCount() + 1);
-                            
instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
-                            needToUpdate = true;
-                            update(instanceId, instance);
-                        }
-                    }
+    public boolean updatePowerState(final long instanceId, final long 
powerHostId,
+            final VirtualMachine.PowerState powerState, Date wisdomEra) {
+        return Transaction.execute((TransactionCallback<Boolean>) status -> {
+            VMInstanceVO instance = findById(instanceId);
+            if (instance == null) {
+                return false;
+            }
+            // Check if we need to update based on powerStateUpdateTime
+            if (instance.getPowerStateUpdateTime() == null || 
instance.getPowerStateUpdateTime().before(wisdomEra)) {
+                Long savedPowerHostId = instance.getPowerHostId();
+                boolean isStateMismatch = instance.getPowerState() != 
powerState
+                        || savedPowerHostId == null
+                        || !savedPowerHostId.equals(powerHostId)
+                        || !isPowerStateInSyncWithInstanceState(powerState, 
powerHostId, instance);
+
+                if (isStateMismatch) {
+                    instance.setPowerState(powerState);
+                    instance.setPowerHostId(powerHostId);
+                    instance.setPowerStateUpdateCount(1);
+                } else if (instance.getPowerStateUpdateCount() < 
MAX_CONSECUTIVE_SAME_STATE_UPDATE_COUNT) {
+                    
instance.setPowerStateUpdateCount(instance.getPowerStateUpdateCount() + 1);
+                } else {
+                    // No need to update if power state is already in sync and 
count exceeded
+                    return false;
                 }
-                return needToUpdate;
+                instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
+                update(instanceId, instance);
+                return true; // Return true since an update occurred
             }
+            return false;
         });
     }
 
+    @Override
+    public Map<Long, VirtualMachine.PowerState> updatePowerState(
+            final Map<Long, VirtualMachine.PowerState> instancePowerStates, 
long powerHostId, Date wisdomEra) {
+        Map<Long, VirtualMachine.PowerState> notUpdated = new HashMap<>();
+        List<VMInstanceVO> instances = listSelectPowerStateByIds(new 
ArrayList<>(instancePowerStates.keySet()));
+        Map<Long, Integer> updateCounts = new HashMap<>();
+        for (VMInstanceVO instance : instances) {
+            VirtualMachine.PowerState powerState = 
instancePowerStates.get(instance.getId());
+            Integer count = getPowerUpdateCount(instance, powerHostId, 
powerState, wisdomEra);
+            if (count != null) {
+                updateCounts.put(instance.getId(), count);
+            } else {
+                notUpdated.put(instance.getId(), powerState);
+            }
+        }
+        if (updateCounts.isEmpty()) {
+            return notUpdated;
+        }
+        StringBuilder sql = new StringBuilder("UPDATE `cloud`.`vm_instance` 
SET " +
+                "`power_host` = ?, `power_state_update_time` = now(), 
`power_state` = CASE ");
+        updateCounts.keySet().forEach(key -> {
+            sql.append("WHEN id = ").append(key).append(" THEN 
'").append(instancePowerStates.get(key)).append("' ");
+        });
+        sql.append("END, `power_state_update_count` = CASE ");
+        StringBuilder idList = new StringBuilder();
+        updateCounts.forEach((key, value) -> {
+            sql.append("WHEN `id` = ").append(key).append(" THEN 
").append(value).append(" ");
+            idList.append(key).append(",");
+        });
+        idList.setLength(idList.length() - 1);
+        sql.append("END WHERE `id` IN (").append(idList).append(")");
+        TransactionLegacy txn = TransactionLegacy.currentTxn();
+        try (PreparedStatement pstmt = 
txn.prepareAutoCloseStatement(sql.toString())) {
+            pstmt.setLong(1, powerHostId);
+            pstmt.executeUpdate();
+        } catch (SQLException e) {
+            logger.error(String.format("Unable to execute update power states 
SQL from VMs %s due to: %s",
+                    idList, e.getMessage()), e);

Review Comment:
   ```suggestion
               logger.error("Unable to execute update power states SQL from VMs 
{} due to: {}", idList, e.getMessage(), e);
   ```



##########
utils/src/main/java/com/cloud/utils/nio/NioConnection.java:
##########
@@ -95,8 +113,8 @@ public void start() throws NioConnectionException {
         try {
             init();
         } catch (final ConnectException e) {
-            logger.warn("Unable to connect to remote: is there a server 
running on port " + _port);
-            return;
+            logger.warn("Unable to connect to remote: is there a server 
running on port" + _port, e);

Review Comment:
   ```suggestion
               logger.warn("Unable to connect to remote: is there a server 
running on port {}?", _port, e);
   ```
   Maybe even change this message to error.



##########
utils/src/main/java/com/cloud/utils/nio/NioConnection.java:
##########
@@ -202,57 +237,64 @@ protected void accept(final SelectionKey key) throws 
IOException {
             logger.trace("Connection accepted for " + socket);
         }
 
-        final SSLEngine sslEngine;
         try {
-            sslEngine = Link.initServerSSLEngine(caService, 
socketChannel.getRemoteAddress().toString());
-            sslEngine.setUseClientMode(false);
-            
sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols()));
             final NioConnection nioConnection = this;
-            _sslHandshakeExecutor.submit(new Runnable() {
-                @Override
-                public void run() {
-                    _selector.wakeup();
-                    try {
-                        sslEngine.beginHandshake();
-                        if (!Link.doHandshake(socketChannel, sslEngine)) {
-                            throw new IOException("SSL handshake timed out 
with " + socketChannel.getRemoteAddress());
-                        }
-                        if (logger.isTraceEnabled()) {
-                            logger.trace("SSL: Handshake done");
-                        }
-                        final InetSocketAddress saddr = 
(InetSocketAddress)socket.getRemoteSocketAddress();
-                        final Link link = new Link(saddr, nioConnection);
-                        link.setSSLEngine(sslEngine);
-                        link.setKey(socketChannel.register(key.selector(), 
SelectionKey.OP_READ, link));
-                        final Task task = _factory.create(Task.Type.CONNECT, 
link, null);
-                        registerLink(saddr, link);
-                        _executor.submit(task);
-                    } catch (IOException e) {
-                        if (logger.isTraceEnabled()) {
-                            logger.trace("Connection closed due to failure: " 
+ e.getMessage());
-                        }
-                        closeAutoCloseable(socket, "accepting socket");
-                        closeAutoCloseable(socketChannel, "accepting 
socketChannel");
-                    } finally {
-                        _selector.wakeup();
+            _sslHandshakeExecutor.submit(() -> {
+                final InetSocketAddress socketAddress = 
(InetSocketAddress)socket.getRemoteSocketAddress();
+                activeAcceptConnections.incrementAndGet();
+                long startTime = System.currentTimeMillis();
+                _selector.wakeup();
+                try {
+                    final SSLEngine sslEngine = 
Link.initServerSSLEngine(caService, 
socketChannel.getRemoteAddress().toString());
+                    sslEngine.setUseClientMode(false);
+                    
sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols()));
+                    sslEngine.beginHandshake();
+                    if (!Link.doHandshake(socketChannel, sslEngine, 
getSslHandshakeTimeout())) {
+                        throw new IOException("SSL handshake timed out with " 
+ socketAddress);
+                    }
+                    if (logger.isTraceEnabled()) {
+                        logger.trace("SSL: Handshake done");
                     }
+                    final Link link = new Link(socketAddress, nioConnection);
+                    link.setSSLEngine(sslEngine);
+                    link.setKey(socketChannel.register(key.selector(), 
SelectionKey.OP_READ, link));
+                    final Task task = _factory.create(Task.Type.CONNECT, link, 
null);
+                    registerLink(socketAddress, link);
+                    _executor.submit(task);
+                } catch (final GeneralSecurityException | IOException e) {
+                    if (logger.isTraceEnabled()) {
+                        logger.trace(socket.getRemoteSocketAddress()+ 
"Connection closed due to failure: " + e.getMessage());
+                    }
+                    closeAutoCloseable(socket, "accepting socket");
+                    closeAutoCloseable(socketChannel, "accepting 
socketChannel");
+                } finally {
+                    int connections = 
activeAcceptConnections.decrementAndGet();
+                    if (logger.isTraceEnabled()) {
+                        logger.trace(String.format("Accept task complete for 
%s - time taken: %d, " +
+                                        "active accept connections: %d",
+                                socketAddress, (System.currentTimeMillis() - 
startTime), connections));
+                    }

Review Comment:
   ```suggestion
                       logger.trace("Accept task complete for {} - time taken: 
{}, active accept connections: {}", () -> socketAddress,
                               () -> (System.currentTimeMillis() - startTime), 
() -> connections);
   ```



##########
engine/orchestration/src/main/java/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java:
##########
@@ -67,119 +68,121 @@ public void processHostVmStatePingReport(long hostId, 
Map<String, HostVmStateRep
         processReport(hostId, translatedInfo, force);
     }
 
-    private void processReport(long hostId, Map<Long, 
VirtualMachine.PowerState> translatedInfo, boolean force) {
-
-        logger.debug("Process VM state report. host: {}, number of records in 
report: {}.", hostId, translatedInfo.size());
-
-        for (Map.Entry<Long, VirtualMachine.PowerState> entry : 
translatedInfo.entrySet()) {
-
-            logger.debug("VM state report. host: {}, vm id: {}, power state: 
{}.", hostId, entry.getKey(), entry.getValue());
-
-            if (_instanceDao.updatePowerState(entry.getKey(), hostId, 
entry.getValue(), DateUtil.currentGMTTime())) {
-                logger.debug("VM state report is updated. host: {}, vm id: {}, 
power state: {}.", hostId, entry.getKey(), entry.getValue());
-
-                _messageBus.publish(null, 
VirtualMachineManager.Topics.VM_POWER_STATE, PublishScope.GLOBAL, 
entry.getKey());
-            } else {
-                logger.trace("VM power state does not change, skip DB writing. 
vm id: {}.", entry.getKey());
+    private void updateAndPublishVmPowerStates(long hostId, Map<Long, 
VirtualMachine.PowerState> instancePowerStates,
+           Date updateTime) {
+        if (instancePowerStates.isEmpty()) {
+            return;
+        }
+        Set<Long> vmIds = instancePowerStates.keySet();
+        Map<Long, VirtualMachine.PowerState> notUpdated = 
_instanceDao.updatePowerState(instancePowerStates, hostId,
+                updateTime);
+        if (notUpdated.size() <= vmIds.size()) {
+            for (Long vmId : vmIds) {
+                if (!notUpdated.isEmpty() && !notUpdated.containsKey(vmId)) {
+                    logger.debug("VM state report is updated. host: {}, vm id: 
{}}, power state: {}}",
+                            hostId, vmId, instancePowerStates.get(vmId));
+                    _messageBus.publish(null, 
VirtualMachineManager.Topics.VM_POWER_STATE,
+                            PublishScope.GLOBAL, vmId);
+                    continue;
+                }
+                logger.trace("VM power state does not change, skip DB writing. 
vm id: {}", vmId);
             }
         }
+    }
 
+    private void processMissingVmReport(long hostId, Set<Long> vmIds, boolean 
force) {
         // any state outdates should be checked against the time before this 
list was retrieved
         Date startTime = DateUtil.currentGMTTime();
         // for all running/stopping VMs, we provide monitoring of missing 
report
-        List<VMInstanceVO> vmsThatAreMissingReport = 
_instanceDao.findByHostInStates(hostId, VirtualMachine.State.Running,
-                VirtualMachine.State.Stopping, VirtualMachine.State.Starting);
-        java.util.Iterator<VMInstanceVO> it = 
vmsThatAreMissingReport.iterator();
-        while (it.hasNext()) {
-            VMInstanceVO instance = it.next();
-            if (translatedInfo.get(instance.getId()) != null)
-                it.remove();
-        }
-
+        List<VMInstanceVO> vmsThatAreMissingReport = 
_instanceDao.findByHostInStatesExcluding(hostId, vmIds,
+                VirtualMachine.State.Running, VirtualMachine.State.Stopping, 
VirtualMachine.State.Starting);
         // here we need to be wary of out of band migration as opposed to 
other, more unexpected state changes
-        if (vmsThatAreMissingReport.size() > 0) {
-            Date currentTime = DateUtil.currentGMTTime();
-            logger.debug("Run missing VM report. current time: {}", 
currentTime.getTime());
-
-            // 2 times of sync-update interval for graceful period
-            long milliSecondsGracefullPeriod = 
mgmtServiceConf.getPingInterval() * 2000L;
-
-            for (VMInstanceVO instance : vmsThatAreMissingReport) {
-
-                // Make sure powerState is up to date for missing VMs
-                try {
-                    if (!force && 
!_instanceDao.isPowerStateUpToDate(instance.getId())) {
-                        logger.warn("Detected missing VM but power state is 
outdated, wait for another process report run for VM id: {}.", 
instance.getId());
-                        
_instanceDao.resetVmPowerStateTracking(instance.getId());
-                        continue;
-                    }
-                } catch (CloudRuntimeException e) {
-                    logger.warn("Checked for missing powerstate of a none 
existing vm", e);
-                    continue;
-                }
+        if (vmsThatAreMissingReport.isEmpty()) {
+            return;
+        }
+        Date currentTime = DateUtil.currentGMTTime();
+        if (logger.isDebugEnabled()) {
+            logger.debug("Run missing VM report. current time: " + 
currentTime.getTime());
+        }
+        if (!force) {
+            List<Long> outdatedVms = vmsThatAreMissingReport.stream()
+                    .filter(v -> !_instanceDao.isPowerStateUpToDate(v))
+                    .map(VMInstanceVO::getId)
+                    .collect(Collectors.toList());
+            _instanceDao.resetVmPowerStateTracking(outdatedVms);
+            vmsThatAreMissingReport = vmsThatAreMissingReport.stream()
+                    .filter(v -> !outdatedVms.contains(v.getId()))
+                    .collect(Collectors.toList());
+        }
 
-                Date vmStateUpdateTime = instance.getPowerStateUpdateTime();
+        // 2 times of sync-update interval for graceful period
+        long milliSecondsGracefulPeriod = mgmtServiceConf.getPingInterval() * 
2000L;
+        Map<Long, VirtualMachine.PowerState> instancePowerStates = new 
HashMap<>();
+        for (VMInstanceVO instance : vmsThatAreMissingReport) {
+            Date vmStateUpdateTime = instance.getPowerStateUpdateTime();
+            if (vmStateUpdateTime == null) {
+                logger.warn("VM power state update time is null, falling back 
to update time for vm id: " + instance.getId());
+                vmStateUpdateTime = instance.getUpdateTime();
                 if (vmStateUpdateTime == null) {
-                    logger.warn("VM power state update time is null, falling 
back to update time for vm id: {}.", instance.getId());
-                    vmStateUpdateTime = instance.getUpdateTime();
-                    if (vmStateUpdateTime == null) {
-                        logger.warn("VM update time is null, falling back to 
creation time for vm id: {}", instance.getId());
-                        vmStateUpdateTime = instance.getCreated();
-                    }
+                    logger.warn("VM update time is null, falling back to 
creation time for vm id: " + instance.getId());
+                    vmStateUpdateTime = instance.getCreated();
                 }
-
-                String lastTime = new 
SimpleDateFormat("yyyy/MM/dd'T'HH:mm:ss.SSS'Z'").format(vmStateUpdateTime);
-                logger.debug("Detected missing VM. host: {}, vm id: {}({}), 
power state: {}, last state update: {}"
+            }
+            if (logger.isDebugEnabled()) {
+                logger.debug(
+                        String.format("Detected missing VM. host: %d, vm id: 
%d(%s), power state: %s, last state update: %s"
                                 , hostId
                                 , instance.getId()
                                 , instance.getUuid()
                                 , VirtualMachine.PowerState.PowerReportMissing
-                                , lastTime);
-
-                long milliSecondsSinceLastStateUpdate = currentTime.getTime() 
- vmStateUpdateTime.getTime();
-
-                if (force || milliSecondsSinceLastStateUpdate > 
milliSecondsGracefullPeriod) {
-                    logger.debug("vm id: {} - time since last state 
update({}ms) has passed graceful period.", instance.getId(), 
milliSecondsSinceLastStateUpdate);
-
-                    // this is were a race condition might have happened if we 
don't re-fetch the instance;
-                    // between the startime of this job and the currentTime of 
this missing-branch
-                    // an update might have occurred that we should not 
override in case of out of band migration
-                    if (_instanceDao.updatePowerState(instance.getId(), 
hostId, VirtualMachine.PowerState.PowerReportMissing, startTime)) {
-                        logger.debug("VM state report is updated. host: {}, vm 
id: {}, power state: PowerReportMissing.", hostId, instance.getId());
-
-                        _messageBus.publish(null, 
VirtualMachineManager.Topics.VM_POWER_STATE, PublishScope.GLOBAL, 
instance.getId());
-                    } else {
-                        logger.debug("VM power state does not change, skip DB 
writing. vm id: {}", instance.getId());
-                    }
-                } else {
-                    logger.debug("vm id: {} - time since last state 
update({}ms) has not passed graceful period yet.", instance.getId(), 
milliSecondsSinceLastStateUpdate);
-                }
+                                , 
DateUtil.getOutputString(vmStateUpdateTime)));
+            }
+            long milliSecondsSinceLastStateUpdate = currentTime.getTime() - 
vmStateUpdateTime.getTime();
+            if (force || (milliSecondsSinceLastStateUpdate > 
milliSecondsGracefulPeriod)) {
+                logger.debug("vm id: {} - time since last state update({} ms) 
has passed graceful period",
+                        instance.getId(), milliSecondsSinceLastStateUpdate);
+                // this is where a race condition might have happened if we 
don't re-fetch the instance;
+                // between the startime of this job and the currentTime of 
this missing-branch
+                // an update might have occurred that we should not override 
in case of out of band migration
+                instancePowerStates.put(instance.getId(), 
VirtualMachine.PowerState.PowerReportMissing);
+            } else {
+                logger.debug("vm id: {} - time since last state update({} ms) 
has not passed graceful period yet",
+                        instance.getId(), milliSecondsSinceLastStateUpdate);
             }
         }
+        updateAndPublishVmPowerStates(hostId, instancePowerStates, startTime);
+    }
+
+    private void processReport(long hostId, Map<Long, 
VirtualMachine.PowerState> translatedInfo, boolean force) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Process VM state report. Host: {}, number of records 
in report: {}. VMs: [{}]",
+                    hostId,
+                    translatedInfo.size(),
+                    translatedInfo.entrySet().stream().map(entry -> 
entry.getKey() + ":" + entry.getValue())
+                            .collect(Collectors.joining(", ")) + "]");
+        }

Review Comment:
   Could change to lazy logging
   ```suggestion
           logger.debug("Process VM state report. Host: {}, number of records 
in report: {}. VMs: [{}]", () -> hostId, () -> translatedInfo.size(),
                   () -> translatedInfo.entrySet().stream().map(entry -> 
entry.getKey() + ":" + entry.getValue())
                           .collect(Collectors.joining(", ")) + "]");
   ```



##########
server/src/main/java/com/cloud/alert/AlertManagerImpl.java:
##########
@@ -257,6 +259,64 @@ public void sendAlert(AlertType alertType, long 
dataCenterId, Long podId, String
         }
     }
 
+    protected void recalculateHostCapacities() {
+        // Calculate CPU and RAM capacities
+        List<Long> hostIds = hostDao.listIdsByType(Host.Type.Routing);
+        if (hostIds.isEmpty()) {
+            return;
+        }
+        ConcurrentHashMap<Long, Future<Void>> futures = new 
ConcurrentHashMap<>();
+        ExecutorService executorService = 
Executors.newFixedThreadPool(Math.max(1,
+                Math.min(CapacityManager.CapacityCalculateWorkers.value(), 
hostIds.size())));
+        for (Long hostId : hostIds) {
+            futures.put(hostId, executorService.submit(() -> {
+                final HostVO host = hostDao.findById(hostId);
+                _capacityMgr.updateCapacityForHost(host);
+                return null;
+            }));
+        }
+        for (Map.Entry<Long, Future<Void>> entry: futures.entrySet()) {
+            try {
+                entry.getValue().get();
+            } catch (InterruptedException | ExecutionException e) {
+                logger.error(String.format("Error during capacity calculation 
for host: %d due to : %s",
+                        entry.getKey(), e.getMessage()), e);
+            }
+        }
+        executorService.shutdown();
+    }
+
+    protected void recalculateStorageCapacities() {
+        List<Long> storagePoolIds = _storagePoolDao.listAllIds();

Review Comment:
   Does this returns removed storage pools? On an operator perspective, I think 
the capacities of pools in maintenance or in the disabled state are useful, 
isn't this the default behavior?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to