Copilot commented on code in PR #13345:
URL: https://github.com/apache/cloudstack/pull/13345#discussion_r3354649209


##########
framework/db/src/main/java/com/cloud/utils/db/GlobalLock.java:
##########
@@ -45,127 +43,248 @@
   * </p>
   */
 public class GlobalLock {
-    protected Logger logger = LogManager.getLogger(getClass());
+    protected final static Logger logger = 
LogManager.getLogger(GlobalLock.class);
 
     private String name;
-    private int lockCount = 0;
-    private Thread ownerThread = null;
-
-    private int referenceCount = 0;
-    private long holdingStartTick = 0;
-
-    private static Map<String, GlobalLock> s_lockMap = new HashMap<String, 
GlobalLock>();
 
+    /**
+     * DB lock count.
+     * Increments on {@link GlobalLock#lock(int)} and decrements on {@link 
GlobalLock#unlock()}.
+     * Upon {@link GlobalLock#unlock()}, if {@link GlobalLock#lockCount} is 
less than 1, then lock removed from DB
+     */
+    private int lockCount;
+
+    /**
+     * Internal (in-memory) lock count.
+     * Increments on {@link GlobalLock#addRef()} and indirectly on {@link 
GlobalLock#getInternLock(String)} and
+     * decrements on {@link GlobalLock#releaseRef()}, {@link 
GlobalLock#unlock()} and on {@link GlobalLock#lock(int)}
+     * if DB lock is unsuccessful
+     */
+    private int referenceCount;
+
+    /**
+     * Thread that owns lock. If lock called from different thread, it will be 
waiting for the owner to unlock it
+     * within requested timeout. If owner thread call {@link 
GlobalLock#lock(int)} again, then
+     * {@link GlobalLock#lockCount} will be incremented.
+     * If {@link GlobalLock#unlock()} called by owner thread, or DB lock will 
be unsuccessful, then owner thread will be
+     * nullified.
+     */
+    private Thread ownerThread;
+
+    /**
+     * Variable to hold lock duration in milliseconds. Used for information 
only.
+     */
+    private long holdingStartTick;
+
+    /**
+     * Holds all created locks.
+     */
+    private static Map<String, GlobalLock> s_lockMap = new HashMap<>();
+
+    /**
+     * Create lock.
+     *
+     * @param name lock name
+     */
     private GlobalLock(String name) {
         this.name = name;
     }
 
+    /**
+     * Increment reference count to lock.
+     *
+     * @return reference count
+     */
     public int addRef() {
         synchronized (this) {
             referenceCount++;
             return referenceCount;
         }
     }
 
+    /**
+     * Decrement reference count to lock.
+     *
+     * @return reference count
+     */
     public int releaseRef() {
-        int refCount;
-
         boolean needToRemove = false;
         synchronized (this) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Releasing reference for internal lock {}, 
reference count: {}, lock count: {}",
+                        name, referenceCount, lockCount);
+            }
             referenceCount--;
-            refCount = referenceCount;
-
-            if (referenceCount < 0)
-                logger.warn("Unmatched Global lock " + name + " reference 
usage detected, check your code!");
 
-            if (referenceCount == 0)
+            if (referenceCount < 0) {
+                logger.warn("Unmatched internal lock {} reference usage 
detected (reference count: {}, " +
+                        "lock count: {}), check your code!", name, 
referenceCount, lockCount);
+            } else if (referenceCount < 1) {
                 needToRemove = true;
+            }
         }
 
-        if (needToRemove)
+        if (needToRemove) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Need to release internal lock {}", name);
+            }
             releaseInternLock(name);
+        }
+        if (logger.isDebugEnabled()) {
+            logger.debug("Released reference for lock {}, reference count: 
{}", name, referenceCount);
+        }
+        return referenceCount;
+    }
 
-        return refCount;
+    public static boolean isLockAvailable(String name) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Checking lock present for {}", name);
+        }
+        boolean result = false;
+        try {
+            result = DbUtil.isFreeLock(name);
+        } finally {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Result of checking lock present for {}: {}", 
name, result);
+            }
+        }
+        return result;

Review Comment:
   GlobalLock.isLockAvailable() delegates to DbUtil.isFreeLock(), but the debug 
messages say "lock present" which is the opposite of what IS_FREE_LOCK() 
reports. This makes troubleshooting lock/availability logic confusing.



##########
engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java:
##########
@@ -1544,24 +2009,26 @@ protected void runInContext() {
     }
 
     protected void connectAgent(final Link link, final Command[] cmds, final 
Request request) {
-        // send startupanswer to agent in the very beginning, so agent can 
move on without waiting for the answer for an undetermined time, if we put this 
logic into another
-        // thread pool.
-        final StartupAnswer[] answers = new StartupAnswer[cmds.length];
+        // send startup answer to agent in the very beginning, so agent can 
move on without waiting for the answer for an undetermined time,
+        // if we put this logic into another thread pool.
+        Map<String, String> backoffConfiguration = 
ConfigKeyUtil.toMap(BackoffConfiguration.value());
+        StartupAnswer[] answers = new StartupAnswer[cmds.length];
         Command cmd;
         for (int i = 0; i < cmds.length; i++) {
             cmd = cmds[i];
-            if (cmd instanceof StartupRoutingCommand || cmd instanceof 
StartupProxyCommand || cmd instanceof StartupSecondaryStorageCommand ||
-                    cmd instanceof StartupStorageCommand) {
-                answers[i] = new StartupAnswer((StartupCommand) cmds[i], 0, 
"", "", mgmtServiceConf.getPingInterval());
-                break;
+            if (cmd instanceof StartupRoutingCommand || cmd instanceof 
StartupProxyCommand || cmd instanceof StartupSecondaryStorageCommand
+                    || cmd instanceof StartupStorageCommand) {
+                StartupAnswer answer = new StartupAnswer((StartupCommand) 
cmds[i], 0, "", "", mgmtServiceConf.getPingInterval());
+                answer.setParams(backoffConfiguration);
+                
answer.setAgentHostStatusCheckDelaySec(AgentHostStatusCheckDelay.value());
+                answers[i] = answer;
             }
         }
-        Response response;
-        response = new Response(request, answers[0], _nodeId, -1);
+        Response response = new Response(request, answers, _nodeId, -1);
         try {

Review Comment:
   connectAgent() builds a StartupAnswer[] aligned to the incoming command 
indexes, but Response.getAnswer() always returns answers[0]. If the 
Startup*Command is not at index 0 (or if answers[0] remains null), the agent 
will not treat the response as a StartupAnswer and the connect handshake can 
fail silently.



##########
engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java:
##########
@@ -454,6 +580,45 @@ private AgentControlAnswer handleControlCommand(final 
AgentAttache attache, fina
         return new AgentControlAnswer(cmd);
     }
 
+    private AgentConnectStatusAnswer 
handleAgentConnectStatusCommand(AgentAttache attache, AgentConnectStatusCommand 
cmd) {
+        HostVO hostVo = _hostDao.findById(attache.getId());
+        return getConnectStatusAnswer(hostVo, cmd);
+    }
+
+    private AgentConnectStatusAnswer getConnectStatusAnswer(HostVO hostVo, 
AgentConnectStatusCommand cmd) {
+        long hostId = hostVo.getId();
+        String hostName = hostVo.getName();
+        String lockName = getHostJoinLockName(hostId);
+        Status status = hostVo.getStatus();
+        try {
+            boolean lockAvailable = GlobalLock.isLockAvailable(lockName);
+            String details = String.format("Global lock %s is%s present for 
%s", lockName, lockAvailable ? "" : " not",
+                    hostName);
+            logger.debug(details);
+            return getAgentConnectStatusAnswer(cmd, lockName, hostName, 
lockAvailable, status, details);
+        } catch (RuntimeException e) {
+            String msg = String.format("Failed to check global lock %s 
presence for %s: %s", lockName, hostName,
+                    e.getMessage());
+            logger.warn(msg, e);
+            return new AgentConnectStatusAnswer(cmd, false, msg);
+        }

Review Comment:
   getConnectStatusAnswer() dereferences hostVo without a null-check 
(hostVo.getId()/getName()), but hostVo is sourced from HostDao lookups that can 
return null. This would turn an AgentConnectStatusCommand into an NPE and 
prevent the agent from determining whether it can proceed with StartupCommand.



##########
agent/src/test/java/com/cloud/agent/HostConnectProcessTest.java:
##########
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.agent;
+
+import com.cloud.exception.CloudException;
+import com.cloud.utils.nio.Link;
+import org.apache.logging.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.springframework.test.util.ReflectionTestUtils;
+
+import javax.naming.ConfigurationException;
+
+import static org.mockito.Mockito.mock;
+
+@RunWith(MockitoJUnitRunner.class)
+public class HostConnectProcessTest {
+
+    private Agent agent;
+    private Logger logger;
+    private Link link;
+    private ServerAttache attache;
+    private HostConnectProcess hostConnectProcess;
+    private boolean connectionTransfer;
+
+    @Before
+    public void setUp() throws ConfigurationException {
+        agent = mock(Agent.class);
+        logger = mock(Logger.class);
+        link = mock(Link.class);
+        attache = mock(ServerAttache.class);
+        hostConnectProcess = new HostConnectProcess(agent);
+        ReflectionTestUtils.setField(agent, "logger", logger);
+    }
+
+    @Test
+    public void testScheduleConnectProcess() throws InterruptedException, 
CloudException {
+
+        hostConnectProcess.scheduleConnectProcess(link, connectionTransfer);
+        Assert.assertTrue(hostConnectProcess.isInProgress());
+    }
+}

Review Comment:
   HostConnectProcessTest schedules a ScheduledExecutorService 
(NamedThreadFactory creates non-daemon threads) but never shuts it down. This 
can leak threads across the test suite and cause tests to hang.



##########
agent/src/main/java/com/cloud/agent/HostConnectProcess.java:
##########
@@ -0,0 +1,355 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.agent;
+
+import com.cloud.agent.api.AgentConnectStatusAnswer;
+import com.cloud.agent.api.AgentConnectStatusCommand;
+import com.cloud.agent.api.Answer;
+import com.cloud.agent.api.Command;
+import com.cloud.agent.api.StartupAnswer;
+import com.cloud.agent.api.StartupCommand;
+import com.cloud.agent.properties.AgentProperties;
+import com.cloud.agent.properties.AgentPropertiesFileHandler;
+import com.cloud.agent.transport.Request;
+import com.cloud.exception.CloudException;
+import com.cloud.exception.OperationTimedoutException;
+import com.cloud.host.Status;
+import com.cloud.resource.ResourceStatusUpdater;
+import com.cloud.resource.ServerResource;
+import com.cloud.utils.concurrency.NamedThreadFactory;
+import com.cloud.utils.nio.Link;
+import org.apache.cloudstack.threadcontext.ThreadContextUtil;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.ThreadContext;
+
+import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
+
+public class HostConnectProcess {
+    private static final Logger logger = 
LogManager.getLogger(HostConnectProcess.class);
+
+    public static final int DEFAULT_ASYNC_COMMAND_TIMEOUT_SEC =
+            
AgentPropertiesFileHandler.getPropertyValue(AgentProperties.ASYNC_COMMAND_TIMEOUT_SEC);
+
+    public static final int DEFAULT_ASYNC_STARTUP_COMMAND_TIMEOUT_SEC =
+            
AgentPropertiesFileHandler.getPropertyValue(AgentProperties.ASYNC_STARTUP_COMMAND_TIMEOUT_SEC);
+
+    static final long HOST_STATUS_CHECK_INITIAL_DELAY_SEC = 10;
+    private long hostStatusCheckDelaySec = 
AgentPropertiesFileHandler.getPropertyValue(AgentProperties.AGENT_HOST_STATUS_CHECK_DELAY_SEC);
+    private final AtomicReference<ScheduledFuture<?>> hostStatusFutureRef = 
new AtomicReference<>();
+    private final Agent agent;
+    private ScheduledExecutorService hostStatusExecutor;
+
+    public HostConnectProcess(Agent agent) {
+        this.agent = agent;
+        initExecutors();
+    }
+
+    private void initExecutors() {
+        stop();
+        var threadFactory = new NamedThreadFactory("Agent-" + 
HostStatusTask.class.getSimpleName());
+        hostStatusExecutor = Executors.newScheduledThreadPool(1, 
threadFactory);
+    }
+
+    /**
+     * Stops the whole connect process and cancels all scheduled asynchronous 
tasks.
+     * Returns {@link Boolean#TRUE} if {@link HostConnectProcess} was waiting 
for {@link StartupAnswer}.
+     */
+    public boolean stop() {
+        logger.debug("Stopping connect process. The process is active: {}", 
isInProgress());
+        stopHostStatusExecutor();
+        logger.debug("Stopped executor");
+        Optional<? extends ScheduledFuture<?>> hostStatusOpt = 
Optional.ofNullable(hostStatusFutureRef.getAndSet(null))
+                .filter(Predicate.not(ScheduledFuture::isCancelled));
+
+        hostStatusOpt.ifPresent(future -> future.cancel(true));
+        logger.debug("Cancelled future");
+
+        return hostStatusOpt.isPresent();
+    }
+
+    private void stopHostStatusExecutor() {
+        if (hostStatusExecutor != null) {
+            hostStatusExecutor.shutdownNow();
+            hostStatusExecutor = null;
+        }
+    }
+
+    public void scheduleConnectProcess(Link link, boolean connectionTransfer) {
+        logger.debug("Scheduling connect process for {}", link);
+        initExecutors();
+
+        var task = new HostStatusTask(link, connectionTransfer, agent, 
hostStatusFutureRef);
+        var future = 
hostStatusExecutor.scheduleWithFixedDelay(ThreadContextUtil.wrapThreadContext(task),
+                HOST_STATUS_CHECK_INITIAL_DELAY_SEC,
+                hostStatusCheckDelaySec, TimeUnit.SECONDS);
+        hostStatusFutureRef.set(future);
+    }
+
+    /**
+     * Returns {@link Boolean#TRUE} if {@link HostStatusTask} created and 
scheduled.
+     * That means there is already {@link Status#Connecting} process is 
running.
+     */
+    public boolean isInProgress() {
+        return Optional.ofNullable(hostStatusFutureRef.get())
+                
.filter(Predicate.not(ScheduledFuture::isCancelled)).isPresent();
+    }
+
+    public void updateHostStatusCheckDelay(int newDelaySec) {
+        logger.info("Updating host status check delay from {} to {} seconds", 
hostStatusCheckDelaySec, newDelaySec);
+        this.hostStatusCheckDelaySec = newDelaySec;
+    }
+
+    /**
+     * Task wait for the Host to be available to connect to submit {@link 
StartupCommand}.
+     * Checks Host status on Management Server cluster and submit {@link 
StartupCommand} only if there is no lock and
+     * Host is not {@link Status#Connecting}.
+     */
+    public static class HostStatusTask implements Runnable, AsyncSend {
+        private final Set<Status> operationalStatuses = 
Set.of(Status.Connecting, Status.Up, Status.Rebalancing);
+
+        private final Link _link;
+        private final boolean _forceConnect;
+        private final Agent _agent;
+        private final AtomicReference<? extends ScheduledFuture<?>> _futureRef;
+
+        public HostStatusTask(Link link, boolean forceConnect, Agent agent,
+                              AtomicReference<? extends ScheduledFuture<?>> 
futureRef) {
+            logger.debug("{} created", this.getClass().getSimpleName());
+            _link = link;
+            _forceConnect = forceConnect;
+            _agent = agent;
+            _futureRef = futureRef;
+        }
+
+        private void cancel() {
+            logger.debug("Cancelling future");
+            Optional.ofNullable(_futureRef.get())
+                    .filter(Predicate.not(ScheduledFuture::isCancelled))
+                    .ifPresent(future -> future.cancel(true));
+            logger.debug("Cancelled future");
+        }
+
+        @Override
+        public void run() {
+            try {
+                logger.debug("Running {}", getClass().getSimpleName());
+                runInternal();
+            } catch (Exception e) {
+                logger.error("Failed to run {}", getClass().getSimpleName(), 
e);
+            }
+        }
+
+        private void runInternal() {
+            ServerAttache attache = (ServerAttache) _link.attachment();
+            if (attache == null || attache.getLink() == null) {
+                cancel();
+                return;
+            }
+
+            AgentConnectStatusAnswer answer;
+            try {
+                answer = getAgentConnectStatusAnswer(attache);
+            } catch (IOException e) {
+                cancel();
+                logger.error("The connection to {} interrupted, restarting the 
whole process", _link, e);
+                _agent.getRequestHandler().submit(() -> 
_agent.reconnect(_link, null, _forceConnect));
+                return;
+            }
+            if (answer == null) {
+                logger.warn("Received empty agent connect status answer, will 
retry later");
+                return;
+            }
+            Boolean lockAvailable = answer.isLockAvailable();
+            Status status = answer.getHostStatus();
+            if (Boolean.TRUE.equals(lockAvailable)) {
+                // send startup command here
+                logger.info("There is no lock and Host status is {}", status);
+                try {
+                    sendStartupCommand(_link, _forceConnect);
+                    logger.debug("Sending startup command to {} finished", 
_link);
+                    cancel();
+                    logger.debug("Unscheduled {}", getClass().getSimpleName());
+                } catch (RuntimeException e) {
+                    logger.error("Failed to send startup command to {}", 
_link, e);
+                } catch (IOException e) {
+                    cancel();
+                    logger.error("The connection to {} interrupted, restarting 
the whole process", _link, e);
+                    _agent.getRequestHandler().submit(() -> 
_agent.reconnect(_link, null, _forceConnect));
+                }
+            } else {
+                logger.info("There is lock and Host status is {}, will retry 
later", status);
+            }
+        }
+
+        private AgentConnectStatusAnswer 
getAgentConnectStatusAnswer(ServerAttache attache) throws IOException {
+            AgentConnectStatusCommand command = 
_agent.setupAgentConnectStatusCommand(new AgentConnectStatusCommand());
+            var commands = new Command[]{command};
+            try {
+                return send(attache, commands, AgentConnectStatusAnswer.class, 
DEFAULT_ASYNC_COMMAND_TIMEOUT_SEC);
+            } catch (RuntimeException e) {
+                String commandName = commands[0].getClass().getSimpleName();
+                logger.error("Failed to retrieve {}, will retry later", 
commandName, e);
+                return null;
+            }
+        }
+
+        public void sendStartupCommand(Link link, boolean connectionTransfer) 
throws IOException {
+            ServerAttache attache = (ServerAttache) link.attachment();
+            if (attache == null || attache.getLink() == null) {
+                return;
+            }
+            ServerResource serverResource = _agent.getResource();
+            StartupCommand[] startup = serverResource.initialize();
+            if (ArrayUtils.isEmpty(startup)) {

Review Comment:
   HostStatusTask.sendStartupCommand() ignores the connectionTransfer flag when 
building StartupCommand[] (it calls serverResource.initialize() instead of 
initialize(connectionTransfer)). Several ServerResource implementations 
override initialize(boolean) and may rely on this flag during initialization, 
not just for setting StartupCommand.connectionTransferred.



##########
agent/src/main/java/com/cloud/agent/Agent.java:
##########
@@ -623,98 +559,351 @@ public Task create(final Task.Type type, final Link 
link, final byte[] data) {
         return new ServerHandler(type, link, data);
     }
 
-    protected void reconnect(final Link link) {
-        reconnect(link, null, false);
+    protected void closeAndTerminateLink(Link link) {
+        Optional.ofNullable(link)
+                .map(Link::attachment)
+                .filter(ServerAttache.class::isInstance)
+                .map(ServerAttache.class::cast)
+                .ifPresentOrElse(ServerAttache::disconnect, () -> {
+                    if (link != null) {
+                        link.close();
+                        link.terminated();
+                    }
+                });
     }
 
-    protected void reconnect(final Link link, String preferredMSHost, boolean 
forTransfer) {
-        if (!(forTransfer || reconnectAllowed)) {
-            logger.debug("Reconnect requested but it is not allowed {}", () -> 
getLinkLog(link));
+    protected void stopAndCleanupConnection() {
+        if (connection == null) {
             return;
         }
-        cancelStartupTask();
-        closeAndTerminateLink(link);
-        closeAndTerminateLink(this.link);
-        setLink(null);
-        cancelTasks();
-        serverResource.disconnected();
-        logger.info("Lost connection to host: {}. Attempting reconnection 
while we still have {} commands in progress.", shell.getConnectedHost(), 
commandsInProgress.get());
-        stopAndCleanupConnection(true);
-        String host = preferredMSHost;
-        if (org.apache.commons.lang3.StringUtils.isBlank(host)) {
-            host = shell.getNextHost();
-        }
-        List<String> avoidMSHostList = shell.getAvoidHosts();
-        do {
-            if (CollectionUtils.isEmpty(avoidMSHostList) || 
!avoidMSHostList.contains(host)) {
-                connection = new NioClient(getAgentName(), host, 
shell.getPort(), shell.getWorkers(), shell.getSslHandshakeTimeout(), this);
-                logger.info("Reconnecting to host: {}", host);
-                try {
-                    connection.start();
-                } catch (final NioConnectionException e) {
-                    logger.info("Attempted to re-connect to the server, but 
received an unexpected exception, trying again...", e);
-                    stopAndCleanupConnection(false);
-                }
+        NioConnection connection = this.connection;
+        connection.stop();
+        try {
+            connection.cleanUp();
+        } catch (final IOException e) {
+            logger.warn("Fail to clean up old connection", e);
+        }
+
+        try {
+            while (connection.isStartup()) {
+                logger.debug("Waiting for connection graceful stop");
+                shell.getBackoffAlgorithm().waitBeforeRetry();
+                connection.stop();
             }
-            shell.getBackoffAlgorithm().waitBeforeRetry();
-            host = shell.getNextHost();
-        } while (!connection.isStartup());
-        shell.updateConnectedHost(((NioClient)connection).getHost());
-        logger.info("Connected to the host: {}", shell.getConnectedHost());
+        } catch (Exception e) {
+            logger.warn("Failed to gracefully stop connection", e);
+        }
+        logger.debug("Connection stopped");
+    }
+
+    /**
+     * Select the host to reconnect to based on priority:
+     * 1. preferredHost if defined and not blank
+     * 2. Link's socket address IP if available and not null
+     * 3. shell.getNextHost() if the above two options are not met
+     *
+     * @param preferredHost the preferred host to connect to
+     * @param link the current link which may contain socket address 
information
+     * @return the host to connect to
+     */
+    protected String selectReconnectionHost(String preferredHost, Link link) {
+        return Optional.ofNullable(preferredHost)
+                .filter(org.apache.commons.lang3.StringUtils::isNotBlank)
+                .orElseGet(() -> Optional.ofNullable(link)
+                        .map(Link::getSocketAddress)
+                        .map(InetSocketAddress::getAddress)
+                        .map(InetAddress::getHostAddress)
+                        .orElseGet(shell::getNextHost));
     }
 
-    protected void closeAndTerminateLink(final Link link) {
-        if (link == null) {
+    /**
+     * Reconnect to Management Server.
+     *
+     * @param link           - connection holder
+     * @param preferredHost  - if defined, reconnect will be performed to this 
Host first,
+     *                       otherwise will be used {@link 
IAgentShell#getNextHost()}
+     * @param forceReconnect - expected to be true if called by {@link 
MigrateAgentConnectionCommand},
+     *                       this is only "switch Management Server", it does 
not perform full Host Connect process.
+     */
+    protected void reconnect(Link link, String preferredHost, boolean 
forceReconnect) {
+        if (!reconnectLock.compareAndSet(false, true)) {
+            logger.warn("Reconnect is already running, exiting");
             return;
         }
-        link.close();
-        link.terminated();
+        String requestedLink = 
Optional.ofNullable(link).map(Link::toString).orElse("N/A");
+        String currentLink = 
Optional.ofNullable(this.link).map(Link::toString).orElse("N/A");
+        logger.info("Reconnect info: provided link: {}, agent link: {}, 
preferred host: {}, force" +
+                " reconnect: {}", requestedLink, currentLink, preferredHost, 
forceReconnect);
+
+        try {
+            logger.debug("Obtained reconnect lock");
+            if (!(forceReconnect || reconnectAllowed)) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Reconnect requested but it is not allowed 
{}", link);
+                }
+                return;
+            }
+
+            if (isReconnectStormDetected(link, preferredHost, requestedLink, 
currentLink)) {
+                return;
+            }
+
+            cleanupConnectionBeforeReconnect(link);
+            // start with preferred host
+            String host = selectReconnectionHost(preferredHost, link);
+
+            String hostLog = LogUtils.getHostLog(host, shell.getPort());
+            List<String> avoidMsHostList = 
Optional.ofNullable(shell.getAvoidHosts()).orElseGet(List::of);
+            // pointer to the first element of "refuse loop"
+            AtomicReference<String> firstRefuseLoopHostRef = new 
AtomicReference<>(null);
+            // to break deadlock where "non-avoid" MS Hosts are down and only 
"avoid" are up
+            AtomicBoolean ignoreAvoidMsHostListRef = new AtomicBoolean(false);
+            do {
+                AtomicBoolean skipTimeoutRef = new AtomicBoolean(false);
+                String parentLogContextId = (String) 
ThreadContext.get("logcontextid");
+                if (parentLogContextId != null) {
+                    ThreadContext.put("logcontextid-parent", 
parentLogContextId);
+                }
+                ThreadContext.put("logcontextid", 
UuidUtils.first(UUID.randomUUID().toString()));
+                if (ignoreAvoidMsHostListRef.get() || 
!avoidMsHostList.contains(host)) {
+                    connection = new NioClient(getAgentName(), host, 
shell.getPort(), shell.getWorkers(),
+                            shell.getSslHandshakeTimeout(), this);
+                    logger.info("Reconnecting to host: {}", hostLog);
+                    try {
+                        connection.start();
+                        // successfully connected, skip the rest
+                        continue;
+                    } catch (Exception e) {
+                        logReconnectionFailure(e, hostLog);
+
+                        try {
+                            stopAndCleanupConnection();
+                        } catch (Exception ex) {
+                            logger.warn("Got an exception during stop and 
cleanup connection", e);
+                        }
+
+                        updateRefuseLoopState(e, host, firstRefuseLoopHostRef, 
ignoreAvoidMsHostListRef, skipTimeoutRef);
+                    }
+                } else {
+                    logger.debug("Next host {} is in avoid list, skipped", 
hostLog);
+                    if 
(org.apache.commons.lang3.StringUtils.isBlank(preferredHost)) {
+                        logHostLists(avoidMsHostList);
+                        skipTimeoutRef.set(true);
+                    }
+                }
+                if (!skipTimeoutRef.get()) {
+                    shell.getBackoffAlgorithm().waitBeforeRetry();
+                }
+                host = shell.getNextHost();
+                hostLog = LogUtils.getHostLog(host, shell.getPort());
+                logger.debug("Next host to connect: {}", hostLog);
+            } while (!connection.isStartup());
+            // successfully connected
+            shell.updateConnectedHost(((NioClient) connection).getHost());
+            String msg = String.format("Connected to the host: %s (%s)", 
shell.getConnectedHost(), this.link);
+            logger.info(msg);
+        } finally {
+            reconnectLock.set(false);
+            logger.debug("Removed reconnect lock");
+        }
     }
 
-    protected void stopAndCleanupConnection(boolean waitForStop) {
-        if (connection == null) {
-            return;
+    /**
+     * Handles "Connection refused" loop detection and determines if backoff 
timeout should be skipped.
+     * Manages refuse loop state to detect when all management servers have 
been tried and need
+     * to ignore avoid list to prevent deadlock.
+     *
+     * @param e the exception from connection attempt
+     * @param host the current host being attempted
+     * @param firstRefuseLoopHostRef reference to first host in refuse loop 
(modified by this method)
+     * @param ignoreAvoidMsHostListRef flag to ignore avoid list (modified by 
this method)
+     * @return true if timeout should be skipped (connection refused), false 
otherwise
+     */
+    private void updateRefuseLoopState(Exception e, String host, 
AtomicReference<String> firstRefuseLoopHostRef, AtomicBoolean 
ignoreAvoidMsHostListRef, AtomicBoolean skipTimeoutRef) {
+        // we are skipping timeout for "Connection refused" to not waste time 
on down MS
+        boolean skipTimeout = Optional.ofNullable(e.getCause())
+                .filter(ConnectException.class::isInstance)
+                .map(Throwable::getMessage)
+                .filter(CONNECTION_REFUSED_MSG::equalsIgnoreCase)
+                .isPresent();
+        skipTimeoutRef.set(skipTimeout);
+        String firstRefuseLoopHost = firstRefuseLoopHostRef.get();
+        // for each "Connection refused" (maybe need to have a copy of 
variable with better name)
+        // start "refuse loop"
+        if (skipTimeout && firstRefuseLoopHost == null) {
+            firstRefuseLoopHostRef.set(host);
+            ignoreAvoidMsHostListRef.set(false);
+            logger.debug("Started refuse loop for host {}", 
firstRefuseLoopHost);
+            // closed "refuse loop"
+        } else if (skipTimeout && firstRefuseLoopHost.equalsIgnoreCase(host)) {
+            ignoreAvoidMsHostListRef.set(true);
+            logger.debug("Closed refuse loop for host {}", 
firstRefuseLoopHost);
+            // got non "refuse" related issue, break "refuse loop"
+        } else if (!skipTimeout && (firstRefuseLoopHostRef != null || 
ignoreAvoidMsHostListRef.get())) {
+            logger.debug("Broke refuse loop for host {} by {}", 
firstRefuseLoopHost, host);
+            firstRefuseLoopHostRef.set(null);
+            ignoreAvoidMsHostListRef.set(false);
         }
-        connection.stop();
+    }
+
+    /**
+     * Logs reconnection failure with appropriate level based on rejection 
reason.
+     * If connection was rejected due to max concurrent connections limit 
(Broken pipe),
+     * logs as warning. Otherwise logs as info.
+     *
+     * @param e the exception that occurred during reconnection attempt
+     * @param hostLog the formatted host log string for logging
+     */
+    private void logReconnectionFailure(Exception e, String hostLog) {
+        // check if got NIO Connection exception, caused by IO Exception 
"Broken pipe"
+        boolean rejectedByMs = 
Optional.of(e).filter(NioConnectionException.class::isInstance)
+                .map(Exception::getCause)
+                .filter(IOException.class::isInstance)
+                .map(IOException.class::cast)
+                .map(IOException::getMessage)
+                .filter(BROKEN_PIPE_MSG::equalsIgnoreCase)
+                .isPresent();
+        if (rejectedByMs) {
+            logger.warn("Attempted to re-connect to {}, but rejected" +
+                    " due to 'agent.max.concurrent.new.connections' reached 
limit," +
+                    " will try again", hostLog, e);
+        } else {
+            logger.info("Attempted to re-connect to {}, but got exception," +
+                    " will try again", hostLog, e);
+        }
+    }
+
+    /**
+     * Logs all management server host lists for debugging reconnection logic.
+     * Outputs defined hosts, hosts to avoid, and calculated available hosts.
+     *
+     * @param avoidMsHostList list of management server hosts to avoid during 
reconnection
+     */
+    private void logHostLists(List<String> avoidMsHostList) {
+        logger.debug("Preferred host is not defined");
         try {
-            connection.cleanUp();
-        } catch (final IOException e) {
-            logger.warn("Fail to clean up old connection. {}", e);
+            List<String> hostsList = Optional.ofNullable(shell.getHosts())
+                    .map(Arrays::asList)
+                    .orElseGet(List::of);
+
+            List<String> hostsShortList = new ArrayList<>(hostsList);
+            hostsShortList.removeAll(avoidMsHostList);
+
+            logger.info("Defined hosts: {} Avoid hosts: {} Available hosts: 
{}",
+                    String.join(", ", hostsList), String.join(", ", 
avoidMsHostList), String.join(", ", hostsShortList));
+        } catch (Exception e) {
+            logger.warn("Failed to calculate next host logic", e);
         }
-        if (!waitForStop) {
-            return;
+    }
+
+    /**
+     * Cleans up current connection state before attempting reconnection.
+     * Stops host connect process, terminates links, cancels scheduled tasks,
+     * notifies server resource about disconnection, and resets connection 
tracking.
+     *
+     * @param link the link that triggered reconnection
+     */
+    private void cleanupConnectionBeforeReconnect(Link link) {
+        String lastConnectedHost = shell.getConnectedHost();
+        try {
+            // reset Host status track and Startup process initiating
+            logger.debug("Stopping Host Connect process");
+            hostConnectProcess.stop();
+            closeAndTerminateLink(link);
+            closeAndTerminateLink(this.link);
+            setLink(null);
+            cancelTasks();
+            serverResource.disconnected();
+            stopAndCleanupConnection();
+            shell.updateConnectedHost(null);
+        } catch (Exception ex) {
+            logger.error("Failed to cleanup previous connection", ex);
+        }
+        logger.info("Lost connection to host: {}. Attempting reconnection 
while we still have" +
+                " {} commands in progress.", lastConnectedHost, 
commandsInProgress.get());
+    }
+
+    /**
+     * Detects reconnection storm by checking if the reconnect request is 
redundant.
+     * This prevents processing stale reconnection requests for old links when
+     * agent has already established a new connection.
+     *
+     * @param link the link requesting reconnection
+     * @param preferredHost the preferred host to reconnect to (may be null)
+     * @param requestedLink string representation of the requested link for 
logging
+     * @param currentLink string representation of the current agent link for 
logging
+     * @return true if reconnection storm is detected and request should be 
skipped, false otherwise
+     */
+    private boolean isReconnectStormDetected(Link link, String preferredHost, 
String requestedLink, String currentLink) {
+        logger.debug("Calling storm guard");
+        boolean reconnectForCurrentLink = link == this.link;
+        boolean currentLinkTerminated = this.link != null && 
this.link.isTerminated();
+        boolean reconnectForNewHost = this.hostname != null && 
this.hostname.equals(preferredHost);
+        // if none of the above is true
+        boolean stormDetected = ! (reconnectForCurrentLink || 
currentLinkTerminated || reconnectForNewHost);

Review Comment:
   isReconnectStormDetected() treats a reconnect as "not a storm" when 
preferredHost equals this.hostname (the agent's hostname). preferredHost is a 
Management Server host, so this condition is unrelated and can incorrectly 
cause legitimate reconnects to be skipped (or storms to be misdetected).



##########
utils/src/main/java/com/cloud/utils/LogUtils.java:
##########
@@ -106,4 +108,29 @@ public static String logGsonWithoutException(String 
formatMessage, Object ... ob
             return errorMsg;
         }
     }
+
+    /**
+     * Generates address entry for log in format of {@code 
IP_ADDRESS/HOST_NAME:PORT}, where {@code HOST_NAME} is
+     * optional if it cannot be resolved.
+     *
+     * @param address IP address or Host name
+     * @param port    port
+     */
+    public static String getHostLog(String address, Integer port) {
+        try {
+            InetAddress inetAddress = InetAddress.getByName(address);
+            String hostName = inetAddress.getHostName();
+            String ipAddress = inetAddress.getHostAddress();
+            if (port == null) {
+                return String.format("%s/%s", ipAddress, hostName);
+            }
+            return String.format("%s/%s:%s", ipAddress, hostName, port);
+        } catch (UnknownHostException e) {
+            LOGGER.warn("Failed to resolve name for address {}", address, e);
+        }
+        if (port == null) {
+            return address;
+        }
+        return String.format("%s:%s", address, port);
+    }

Review Comment:
   LogUtils.getHostLog() performs DNS lookups (InetAddress.getByName + 
getHostName). This is called during connection/reconnection logging (e.g., 
NioClient.init and Agent.reconnect loop), so slow/blocked DNS can materially 
delay reconnect attempts. Consider avoiding reverse lookups and just log the 
resolved IP plus the original host string.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to